常见的并发任务
单例模式 (只执行一次) sync.Once
保证在多线程的情况下 某段代码仅执⾏⼀次
// 单例模式 (懒汉式,线程安全)
type Singleton struct {
}
var singleInstance *Singleton
var once sync.Once
func GetSingletonObj() *Singleton{
once.Do(func(){
fmt.Println("Create Obj")
singleInstance = new(Singleton)
})
return singleInstance
}
func TestSingletonObj(t *testing.T){
var wg sync.WaitGroup
for i:=0; i< 5; i++{
wg.Add(1)
go func(){
obj := GetSingletonObj()
fmt.Printf("%d\n", unsafe.Pointer(obj))
wg.Done()
}()
}
wg.Wait()
}
//6346832
//6346832
//6346832
//6346832
//6346832
也可以通过 init 方法实现单例模式(类似python的package)
仅需任一任务完成就退出 return <-ch 阻塞channel
1 多个 channel (buffer 数量 与 协程数一致 ) 同时进行
buffer 数量 与 协程数 不一致可能会导致 其他线程阻塞
2 <- channel 阻塞channel, 有任一协程返回, 退出
package test_package
import (
"fmt"
"runtime"
"sync"
"testing"
"time"
)
func runTask(id int) string {
time.Sleep(10 * time.Microsecond)
return fmt.Sprintf("The result is from %d", id)
}
func FirstResponse() string {
// 开启的 协程数量 应该和 channel的buffer 一致
// 保证没有协程 hang住, 一直等待
numberOfRunner := 10
ch := make(chan string, numberOfRunner)
for i:=0; i<numberOfRunner; i++{
go func(i int){
ret := runTask(i)
ch <- ret
wg.Done()
}(i)
}
return <-ch //
}
func TestFirstResponse(t *testing.T){
t.Log("Before:", runtime.NumGoroutine()) //2
t.Log(FirstResponse())
t.Log("After:", runtime.NumGoroutine()) // 3
}
必须所有的任务都完成 (等所有任务完成 利用channel阻塞
或者 sync.WaitGroup
)
// 1 chennel阻塞
func runTask(id int) string {
time.Sleep(10 * time.Microsecond)
return fmt.Sprintf("The result is from %d", id)
}
func AllResponse() string {
numberOfRunner := 10
ch := make(chan string, numberOfRunner)
for i:=0; i<numberOfRunner; i++{
go func(i int){
ret := runTask(i)
ch <- ret
}(i)
}
finalRet := ""
for j:=0;j<numberOfRunner;j++{
finalRet += <-ch + "\n"
}
return finalRet
}
// 2 sync.WaitGroup
func runTask(id int) string {
time.Sleep(10 * time.Microsecond)
return fmt.Sprintf("The result is from %d", id)
}
func AllResponse() string {
var wg sync.WaitGroup
numberOfRunner := 10
ch := make(chan string, numberOfRunner)
for i:=0; i<numberOfRunner; i++{
wg.Add(1)
go func(i int){
defer wg.Done()
ret := runTask(i)
ch <- ret
}(i)
}
wg.Wait() // 等待所有的发送完成
close(ch)
finalRet := ""
for {
if v, ok:=<-ch; ok{
finalRet += v + "\n"
}else{
break
}
}
// 或者直接遍历channel
//finalRet := ""
//for v:= range ch{
// finalRet += v + "\n"
// }
return finalRet
}
func TestFirstResponse(t *testing.T){
t.Log("Before:", runtime.NumGoroutine())
t.Log(AllResponse())
t.Log("After:", runtime.NumGoroutine())
}
对象池 (使用buff channel实现
)
# 存放对象的池子
type ReusableObj struct {
}
type ObjPool struct{
buffChan chan *ReusableObj
}
func NewObjPool(numberOfObj int) *ObjPool{
objPool := ObjPool{}
objPool.buffChan = make(chan *ReusableObj, numberOfObj)
for i:=0; i<numberOfObj; i++{
objPool.buffChan <- &ReusableObj{}
}
return &objPool
}
func (p *ObjPool) GetObj(timeout time.Duration)(*ReusableObj, error){
select{
case ret := <-p.buffChan:
return ret, nil
case <-time.After(timeout):
return nil, errors.New("timeout error")
}
}
func (p *ObjPool) ReleaseObj(obj *ReusableObj) error {
select{
case p.buffChan <- obj:
return nil
default:
return errors.New("overflow")
}
}
func TestObjPool(t *testing.T){
pool := NewObjPool(10)
//
//if err := pool.ReleaseObj(&ReusableObj{}); err!=nil{
// t.Error(err)
//}
for i:=0; i<11; i++{
if v, err := pool.GetObj(time.Second*2); err != nil{
t.Error(err)
}else{
fmt.Printf("%T", v) // *test_package.ReusableObj
if err := pool.ReleaseObj(v); err != nil{
t.Error(err)
}
}
}
}
sync.Pool
对象缓存 的概念 (与Processor
相关)
只能缓存一个对象
sync.Pool 对象的获取
• 尝试从私有对象获取
• 私有对象不存在,尝试从当前 Processor 的共享池获取
• 如果当前 Processor 共享池也是空的,那么就尝试去其他
Processor 的共享池获取
• 如果所有⼦池都是空的,最后就⽤⽤户指定的 New 函数
产⽣⼀个新的对象返回
sync.Pool 对象的放回
• 如果私有对象不存在, 则保存为 私有对象
• 如果私有对象存在,放⼊当前 Processor ⼦池的 共享池 中
sync.Pool 对象的 生命周期 (
每次GC
)
• GC 会清除 sync.pool 缓存的对象
• 对象的缓存有效期为下⼀次GC 之前
sync.Pool 总结
• 适合于通过复⽤,降低复杂对象的创建和 GC 代价
• 协程安全,会有锁的开销
• 不适合于做连接池 ⽣命周期受 GC 影响,,需⾃⼰管理⽣命周期的资源的池化
使用方法:
// 这里的值 不会放在 私有对象里, 如果没有 put 放入, 每次get 都会重新 使用 New 函数
pool := &sync.Pool{
New: func() interface{} {
return 0
},
}
arry := pool.Get().(int)
…
pool.Put(10)
func TestSyncPool(t *testing.T){
pool := &sync.Pool{
New: func() interface{}{
fmt.Println("Create a new object")
return 100
},
}
v := pool.Get().(int) // 获取的时候 私有对象,Processor 共享池 都是空的, New 函数创建 100
fmt.Println(v)
pool.Put(3) // 放入 私有对象 3
v1, _ := pool.Get().(int) // 获取私有对象 3
fmt.Println(v1)
runtime.GC() // GC 回收
v2, _ := pool.Get().(int) // 获取私有对象 私有对象,Processor 共享池 都是空的, New 函数创建 100
fmt.Println(v2)
}
// 私有对象 协程安全
func TestSyncPoolMultiGoroutine(t *testing.T){
pool := &sync.Pool{
New: func()interface{}{
fmt.Println("create a new obj")
return 10
},
}
pool.Put(100)
pool.Put(100)
pool.Put(100)
// 私有对象 中放入 三个 100
var wg sync.WaitGroup
for i:=0;i<10;i++{
wg.Add(1)
go func(id int){
defer wg.Done()
t.Log(pool.Get()) // 获取 3个100 之后, 会新创建 10
}(i)
}
wg.Wait()
}
//100
//100
//100
//10
//10
//10
//10
//10
//10
//10
测试
单元测试
表格测试法 (直观): 输入输出对应
内置单元测试框架
• Fail, Error: 该测试失败,该测试继续,其他测试继续执⾏
• FailNow, Fatal: 该测试失败,该测试中⽌,其他测试继续执⾏
go test -v package_test.go
go test -v - cover package_test.go 代码覆盖率
使用 断言
assert
git clone github.com/stretchr/testify/assert
性能测试 Benchmark
go test -bench=.
go test -bench=BenchmarkConcatStringByAdd
func BenchmarkConcatStringByAdd(b *testing.B) {
//与性能测试⽆关的代码
b.ResetTimer()
for i := 0; i < b.N; i++ {
//测试代码
}
b.StopTimer()
//与性能测试⽆关的代码
}
demo 字符串拼接对比
# 1 +
func BenchmarkConcatStringByAdd(b *testing.B){
elems := []string{"1", "2", "3", "4", "5"}
b.ResetTimer()
for i :=0; i<b.N; i++{
ret := ""
for _, elem := range elems{
ret += elem
}
}
b.StopTimer()
}
# 2 buff write
func BenchmarkConcatStringByBytes(b *testing.B){
elems := []string{"1", "2", "3", "4", "5"}
b.ResetTimer()
for i:=0; i<b.N;i++{
var buf bytes.Buffer
for _, elem := range elems{
buf.WriteString(elem)
}
}
b.StopTimer()
}
// 结果
go test -bench=.
goos: windows
goarch: amd64
pkg: ch10/test_package
BenchmarkConcatStringByAdd-4 10000000 173 ns/op
BenchmarkConcatStringByBytes-4 20000000 109 ns/op
PASS
ok ch10/test_package 4.403s
go test -bench=. -benchmem (内存使用情况)
goos: windows
goarch: amd64
pkg: ch10/test_package
BenchmarkConcatStringByAdd-4 5000000 260 ns/op 16 B/op 4 allocs/op
BenchmarkConcatStringByBytes-4 10000000 182 ns/op 112 B/op 1 allocs/op
PASS
ok ch10/test_package 3.809s
Behavior Driven Development BDD
标准
⽤业务领域的语⾔来描述
Story Card
Given 给定条件
When 条件
Then 结果
BDD in Go
go get -u github.com/smartystreets/goconvey/convey
func TestSpec(t *testing.T){
convey.Convey("Given 2 even numbers", t, func(){
a := 2
b := 4
convey.Convey("When add the two numbers", func(){
c :=a + b
convey.Convey("Then the result is still even", func(){
convey.So(c%2, convey.ShouldEqual, 0)
})
})
})
}
=== RUN TestSpec
Given 2 even numbers
When add the two numbers
Then the result is still even .
1 total assertion
--- PASS: TestSpec (0.00s)
PASS
ok command-line-arguments 0.162s
反射和 Unsafe
反射
一个实例对象 包括
类型
和值
1 reflect.TypeOf
vs. reflect.ValueOf
• reflect.TypeOf 返回类型 (reflect.Type)
• 可以从 reflect.Value 获得类型
• 通过 kind 的来判断类型
• reflect.ValueOf 返回值 (reflect.Value)
func TestTypeAndValue(t *testing.T){
var f int64 = 10
t.Log(reflect.TypeOf(f), reflect.ValueOf(f))
t.Log(reflect.ValueOf(f).Type())
}
func CheckType(v interface{}){
t := reflect.TypeOf(v)
switch t.Kind(){
case reflect.Float32, reflect.Float64:
fmt.Println("Float")
case reflect.Int, reflect.Int32, reflect.Int64:
fmt.Println("Integer")
default:
fmt.Println("Unknown")
}
}
func TestBasicType(t *testing.T){
var f float64 = 12
CheckType(&f)
}
2 使用反射 提高代码灵活性
提⾼了程序的灵活性
降低了程序的可读性
降低了程序的性能
(1) ValueOf 值
reflect.ValueOf(*e).FieldByName("Name")
# e.values.get("Name") 获取field的值
(2)TypeOf 类型
reflect.TypeOf(*e).FieldByName("Name")
# e.get("Name") 判断是否有这个field
可以获取 field 类型中的其他属性 (Type, struct tag ..)
例如 nameField ( Name `format: "normal"` )
nameField.Tag.Get("format") --> normal
(3) MethodByName
reflect.ValueOf(e).MethodByName("UpdateAge").Call([]reflect.Value{reflect.ValueOf(1)})
// e 指针
Struct Tag
type BasicInfo struct {
Name string `json:"name"` // Struct Tag
Age int `json:"age"` // Struct Tag
}
3 DeepEqual
比较 切片和map
func TestDeepEqual(t *testing.T){
a := map[int]string {1:"one"}
b := map[int]string {2:"two"}
s1 := []int{1, 2}
s2 := []int{1, 2}
//t.Log(a == b) // invalid operation: a == b (map can only be compared to nil)
//t.Log(s1 == s2) // invalid operation: s1 == s2 (slice can only be compared to nil)
t.Log(reflect.DeepEqual(a, b))
t.Log(reflect.DeepEqual(s1, s2))
}
Unsafe
可以进行 类型转换
i := 10
f := *(*float64)(unsafe.Pointer(&i))
常见架构模式的实现
Pipe-Filter
架构
-
⾮常适合与
数据处理及数据分析
系统• Filter封装数据处理的功能
松耦合:Filter只跟数据(格式)耦合
• Pipe⽤于连接Filter传递数据或者在异步处理过程中缓冲数据流
进程内同步调⽤时,pipe演变为数据在⽅法调⽤间传递
解析请求
参数处理
认证
过滤
排序
填充数据
// 实例 -- "1,2,3" --> []string{"1", "2", "3"} --> []int[1,2,3] --> 6
// 0 抽象接口
package pipefilter
type Request interface{}
type Response interface{}
type Filter interface{
Process(data Request) (Response, error)
// implement 'Filter' with methods: Process
}
// 1 split_filter
var SplitFilterWrongFormatError = errors.New("input data should be string")
type SplitFilter struct{
delimiter string
}
func NewSplitFilter(delimiter string) *SplitFilter{
return &SplitFilter{delimiter}
}
func (sf *SplitFilter) Process(data Request) (Response, error){
// implement 'Filter' with methods: Process
str, ok := data.(string) // 检查数据格式
if !ok{
return nil, SplitFilterWrongFormatError
}
parts_ret := []string{}
parts := strings.Split(str, sf.delimiter)
for _, v := range parts{
parts_ret = append(parts_ret, strings.TrimSpace(v))
}
return parts_ret, nil
}
// 2 to int filter
var ToIntFilterWrongFormatError = errors.New("input should be a []string")
type ToIntFilter struct{
}
func NewToIntFilter() *ToIntFilter{
return &ToIntFilter{}
}
func (tif *ToIntFilter) Process(data Request) (Response, error){
parts, ok := data.([]string)
if !ok{
return nil, ToIntFilterWrongFormatError
}
ret := []int{}
for _, part := range parts{
s, err := strconv.Atoi(part)
if err != nil{
return nil, err
}
ret = append(ret, s)
}
return ret, nil
}
// 3 sum filter
var SumFilterError = errors.New("input data should be a []int")
type SumFilter struct{
}
func NewSumFilter() *SumFilter{
return &SumFilter{}
}
func (sf *SumFilter) Process(data Request)(Response, error){
elems, ok := data.([]int)
if !ok{
return nil, SumFilterError
}
ret :=0
for _, elem := range elems{
ret += elem
}
return ret, nil
}
// 4 主管道, 汇总
type Pipeline struct {
Name string
Filters *[]Filter
}
func NewPipeline(name string, filters ...Filter) *Pipeline{
// 传入多个 filter
return &Pipeline{
Name: name,
Filters: &filters,
}
}
func (p *Pipeline) Process(data Request) (Response, error){
var ret interface{}
var err error
for _, filter := range *p.Filters{
ret, err = filter.Process(data)
if err != nil {
return ret, nil
}
data = ret
}
return ret, err
}
// 5 使用
spliter := NewSplitFilter(",")
inter := NewToIntFilter()
sumer := NewSumFilter()
pipeline := NewPipeline("p1", spliter, inter, sumer)
ret, err := pipeline.Process("1, 2, 3")
Micro-Kernel 微内核模式
Micro Kernel
(常用的IDE 都是 Micro Kernel模式)
• 特点
• 易于扩展
• 错误隔离
• 保持架构⼀致性
• 实现要点
• 内核包含公共流程或通⽤逻辑
• 将可变或可扩展部分规划为扩展点
• 抽象扩展点⾏为,定义接⼝
• 利⽤插件进⾏扩展
package microkernel
import (
"context"
"errors"
"fmt"
"strings"
"sync"
)
const (
Waiting = iota
Running
)
var WrongStateError = errors.New("can not take the operation in the current state")
// 自定义异常
type CollectorsError struct {
CollectorErrors []error
}
func (ce CollectorsError) Error() string {
var strs []string
for _, err := range ce.CollectorErrors {
strs = append(strs, err.Error())
}
return strings.Join(strs, ";")
}
type Event struct {
Source string
Content string
}
type EventReceiver interface {
OnEvent(evt Event)
}
type Collector interface {
Init(evtReceiver EventReceiver) error
Start(agtCtx context.Context) error
Stop() error
Destory() error
}
type Agent struct {
collectors map[string]Collector
evtBuf chan Event
cancel context.CancelFunc
ctx context.Context
state int
}
func (agt *Agent) EventProcessGroutine() {
var evtSeg [10]Event
for {
for i := 0; i < 10; i++ {
select {
case evtSeg[i] = <-agt.evtBuf:
case <-agt.ctx.Done():
return
}
}
fmt.Println(evtSeg)
}
}
func NewAgent(sizeEvtBuf int) *Agent {
agt := Agent{
collectors: map[string]Collector{},
evtBuf: make(chan Event, sizeEvtBuf),
state: Waiting,
}
return &agt
}
func (agt *Agent) RegisterCollector(name string, collector Collector) error {
if agt.state != Waiting {
return WrongStateError
}
agt.collectors[name] = collector
return collector.Init(agt)
}
func (agt *Agent) startCollectors() error {
var err error
var errs CollectorsError
var mutex sync.Mutex
for name, collector := range agt.collectors {
go func(name string, collector Collector, ctx context.Context) {
defer func() {
mutex.Unlock()
}()
err = collector.Start(ctx)
mutex.Lock()
if err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}(name, collector, agt.ctx)
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) stopCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Stop(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) destoryCollectors() error {
var err error
var errs CollectorsError
for name, collector := range agt.collectors {
if err = collector.Destory(); err != nil {
errs.CollectorErrors = append(errs.CollectorErrors,
errors.New(name+":"+err.Error()))
}
}
if len(errs.CollectorErrors) == 0 {
return nil
}
return errs
}
func (agt *Agent) Start() error {
if agt.state != Waiting {
return WrongStateError
}
agt.state = Running
agt.ctx, agt.cancel = context.WithCancel(context.Background())
// ctx 上下文管理
//
go agt.EventProcessGroutine()
//
return agt.startCollectors()
}
func (agt *Agent) Stop() error {
if agt.state != Running {
return WrongStateError
}
agt.state = Waiting
agt.cancel()
return agt.stopCollectors()
}
func (agt *Agent) Destory() error {
if agt.state != Waiting {
return WrongStateError
}
return agt.destoryCollectors()
}
func (agt *Agent) OnEvent(evt Event) {
agt.evtBuf <- evt
}
// test 新建一个 collector 插件 DemoCollector
package microkernel
import (
"context"
"errors"
"fmt"
"testing"
"time"
)
type DemoCollector struct {
evtReceiver EventReceiver
agtCtx context.Context
stopChan chan struct{}
name string
content string
}
func NewCollect(name string, content string) *DemoCollector {
return &DemoCollector{
stopChan: make(chan struct{}),
name: name,
content: content,
}
}
func (c *DemoCollector) Init(evtReceiver EventReceiver) error {
fmt.Println("initialize collector", c.name)
c.evtReceiver = evtReceiver
return nil
}
func (c *DemoCollector) Start(agtCtx context.Context) error {
fmt.Println("start collector", c.name)
for {
select {
case <-agtCtx.Done():
c.stopChan <- struct{}{}
break
default:
time.Sleep(time.Millisecond * 50)
c.evtReceiver.OnEvent(Event{c.name, c.content})
}
}
}
func (c *DemoCollector) Stop() error {
fmt.Println("stop collector", c.name)
select {
case <-c.stopChan:
return nil
case <-time.After(time.Second * 1):
return errors.New("failed to stop for timeout")
}
}
func (c *DemoCollector) Destory() error {
fmt.Println(c.name, "released resources.")
return nil
}
func TestAgent(t *testing.T) {
agt := NewAgent(100)
c1 := NewCollect("c1", "1")
//c2 := NewCollect("c2", "2")
agt.RegisterCollector("c1", c1)
//agt.RegisterCollector("c2", c2)
if err := agt.Start(); err != nil {
fmt.Printf("start error %v\n", err)
}
time.Sleep(time.Second * 1)
agt.Stop()
agt.Destory()
}
常见的开发任务
解析Json
内置的解析
json.Marshal
,json.Unmarshal
适用于配置文件之类的初始化
利⽤反射实现,通过FeildTag来标识对应的json 值
性能差一些(利⽤反射)
type BasicInfo struct {
Name string `json:"name"`
Age int `json:"age"`
}
type JobInfo struct {
Skills []string `json:"skills"`
}
type Employee struct {
BasicInfo BasicInfo `json:"basic_info"`
JobInfo JobInfo `json:"job_info"`
}
var jsonStr = `{
"basic_info":{
"name":"Mike",
"age":30
},
"job_info":{
"skills":["Java","Go","C"]
}
} `
func TestJson(t *testing.T){
e := new(Employee)
err := json.Unmarshal([]byte(jsonStr), e)
if err != nil {
t.Error(err)
}
fmt.Println(*e)
if v, err := json.Marshal(e); err == nil {
fmt.Println(string(v))
}else{
t.Error(err)
}
}
EasyJson
go get -u github.com/mailru/easyjson/...
`根据定义的模板文件 生成对应的解析代码
easyjson -all <结构定义>.go
使用:
func TestEasyJson(t *testing.T) {
e := new(Employee)
_ = e.UnmarshalJSON([]byte(jsonStr))
fmt.Println(e)
if v, err := e.MarshalJSON(); err != nil {
t.Error(err)
}else{
fmt.Println(string(v))
}
}
对比两种方式
func BenchmarkEmbeddedJson(b *testing.B) {
b.ResetTimer()
e := new(Employee)
for i := 0; i < b.N; i++{
err := json.Unmarshal([]byte(jsonStr), e)
if err != nil {
b.Error(err)
}
if _, err = json.Marshal(e); err != nil {
b.Error(err)
}
}
}
func BenchmarkEasyJson(b *testing.B){
b.ResetTimer()
e := new(Employee)
for i:=0; i<b.N;i++{
err := e.UnmarshalJSON([]byte(jsonStr))
if err != nil {
b.Error(err)
}
if _, err = e.MarshalJSON(); err != nil {
b.Error(err)
}
}
}
goos: windows
goarch: amd64
pkg: microkernal/micro_kernal
BenchmarkEmbeddedJson-4 200000 5369 ns/op
BenchmarkEasyJson-4 1000000 1231 ns/op
PASS
HTTP
服务
内置的
net/http
路由规则
• URL 分为两种,末尾是 /:表示⼀个⼦树,后⾯可以跟其他⼦路径; 末尾不
是 /,表示⼀个叶⼦,固定的路径
• 以/ 结尾的 URL 可以匹配它的任何⼦路径,⽐如 /images 会匹配 /images/
cute-cat.jpg
• 它采⽤最⻓匹配原则,如果有多个匹配,⼀定采⽤匹配路径最⻓的那个进⾏处
理
• 如果没有找到任何匹配项,会返回 404 错误
func main() {
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request){
fmt.Fprintf(w, "hello world")
})
http.HandleFunc("/time/", func(w http.ResponseWriter, r *http.Request){
t := time.Now()
timeStr := fmt.Sprintf("time: %s", t)
w.Write([]byte(timeStr))
})
http.ListenAndServe(":8007", nil)
}
httprouter
func Hello(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
fmt.Fprintf(w, "hello, %s!\n", ps.ByName("name"))
}
func Index(w http.ResponseWriter, r *http.Request, _ httprouter.Params){
w.Write([]byte("index!!"))
}
func main() {
router := httprouter.New()
router.GET("/", Index)
router.GET("/hello/:name", Hello)
log.Fatal(http.ListenAndServe(":8080", router))
}
关于
⾯向资源的架构 ROA
(Resource Oriented Architecture) 以及RESTful
在软件工程中,面向资源的体系结构(ROA)是一种软件体系结构和编程范式,
用于通过具有“ RESTful”接口的资源互联网络的形式来支持设计和开发软件
http://www.people_portol.com/employee/{name}
type Employee struct {
ID string `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}
var employeeDB map[string]*Employee
// 初始化
func init() {
employeeDB = map[string]*Employee{}
employeeDB["mike"] = &Employee{"e-1", "mike", 35}
employeeDB["rose"] = &Employee{"e-2", "rose", 32}
}
func Index(w http.ResponseWriter, r *http.Request, _ httprouter.Params){
fmt.Fprint(w, "Index Page!")
}
func GetEmployeeByName(w http.ResponseWriter, r *http.Request, ps httprouter.Params){
qName := ps.ByName("name")
var (
ok bool
info *Employee
infoJson []byte
err error
)
if info, ok = employeeDB[qName]; !ok{
w.Write([]byte("{\"error\": \"not found\"}"))
return
}
if infoJson, err = json.Marshal(info); err != nil {
w.Write([]byte(fmt.Sprintf("{\"error\": \"%s\"}", err)))
return
}
w.Write(infoJson)
}
func main() {
router := httprouter.New()
router.GET("/", Index)
router.GET("/employee/:name", GetEmployeeByName)
log.Fatal(http.ListenAndServe(":8007", router))
}
性能分析, 调优
准备工作
• 安装 graphviz
• brew install graphviz
• 将 $GOPATH/bin 加⼊ $PATH
• Mac OS: 在 .bash_profile 中修改路径
• 安装 go-torch(v1.11+ 内置在go基本库中)
• go get github.com/uber/go-torch
• 下载并复制 flamegraph.pl (https://github.com/brendangregg/FlameGraph)⾄ $GOPATH/bin 路径下
• 将 $GOPATH/bin 加⼊ $PATH
调优实例
调优步骤:
-
生成profile文件
go test 输出 profile
go test -bench=. -cpuprofile=cpu.prof // 查看 cpu go test -bench=. -memprofile=mem.prof // 查看内存 go test -bench=. -blockprofile=block.prof
-
查看profile文件
go tool pprof cpu.prof go-torch cpu.prof (查看火炬图片)
-
top -cum
// 查看运行时间flat flat% sum% cum cum% 0 0% 0% 1.58s 76.33% ch47/optim.BenchmarkProcessRequestOld 0.05s 2.42% 2.42% 1.58s 76.33% ch47/optim.processRequestOld
-
查看具体的函数运行情况
list processRequestOld 50ms 1.58s (flat, cum) 76.33% of Total . . 41: return reps . . 42:} . . 43: . . 44:func processRequestOld(reqs []string) []string { . . 45: reps := []string{} 10ms 10ms 46: for _, req := range reqs { . . 47: reqObj := &Request{} . 720ms 48: json.Unmarshal([]byte(req), reqObj) . . 49: ret := "" 10ms 10ms 50: for _, e := range reqObj.PayLoad { 30ms 450ms 51: ret += strconv.Itoa(e) + "," . . 52: } . 10ms 53: repObj := &Response{reqObj.TransactionID, ret} . 360ms 54: repJson, err := json.Marshal(&repObj) . . 55: if err != nil { . . 56: panic(err) . . 57: } . 20ms 58: reps = append(reps, string(repJson)) . . 59: } . . 60: return reps . . 61:}
-
优化前后对比
1 json 转换
1.1 内置 json.Marshal -- (使用反射) 1.2 使用easyjson
2 字符串的拼接
2.1 字符串"+", go 语言中 string 是不可变的 slice, 每次相加都会有新的变量申请 2.2 使用buff写
常⻅分析指标
• Wall Time
• CPU Time
• Block Time
• Memory allocation
• GC times/time spent
保证高性能代码
合理的使用
Lock
go普通的map是不支持并发的,换而言之, 不是线程(goroutine)安全的
golang 1.6之后,并发地读写会直接panic
fatal error: concurrent map read and map write
线程安全的 Map
在 go 1.9+
后可以:
1 使用 sync.RWMutex 锁
2 使用 sync.map
3 使用第三方实现 https://github.com/orcaman/concurrent-map
-
sync.Map
(适合 读多写少的情况)原理:
空间换时间通过冗余的两个数据结构(read、dirty),实现加锁对性能的影响 引入两个map将读写分离到不同的map read map 提供 并发读 和 已存元素 原子写 dirty map 负责读写
适合 读多写少,且 Key 相对稳定的环境
空间换时间的⽅案 (采⽤指针的⽅式间接实现值的映射)
存储空间会较 built-in map ⼤
concurrent-map
(读写都很多的情况)
//1 sync.map
package maps
import "sync"
type SyncMap struct {
m sync.Map
}
func CreateSyncMap() *SyncMap{
return &SyncMap{}
}
func (s *SyncMap) Set(key string, value interface{}){
s.m.Store(key, value)
}
func (s *SyncMap) Get(key string)(interface{}, bool){
return s.m.Load(key)
}
func (s *SyncMap) Del(key string){
s.m.Delete(key)
}
// 2 rw map
package maps
import "sync"
type RWLockMap struct {
m map[interface{}]interface{}
lock sync.RWMutex
}
func (m *RWLockMap) Get(key string) (interface{}, bool){
m.lock.RLock()
v, ok := m.m[key]
m.lock.RUnlock()
return v, ok
}
func (m *RWLockMap) Set(key string, value interface{}){
m.lock.Lock()
m.m[key] = value
m.lock.Unlock()
}
func (m *RWLockMap) Del(key string){
m.lock.Lock()
delete(m.m, key)
m.lock.Unlock()
}
func CreateRWLockMap() *RWLockMap {
m := make(map[interface{}]interface{})
return &RWLockMap{m: m}
}
// 3 concurrent map
package maps
import "src/github.com/orcaman/concurrent-map"
type ConcurrentMap struct {
m cmap.ConcurrentMap
}
func CreateConcurrentMap() *ConcurrentMap{
return &ConcurrentMap{m: cmap.New()}
}
func (c *ConcurrentMap) Set(key string, value interface{}){
c.m.Set(key, value)
}
func (c *ConcurrentMap) Get(key string)(interface{}, bool){
return c.m.Get(key)
}
func (c *ConcurrentMap) Del(key string){
c.m.Remove(key)
}
// benchmark
package maps
import (
"strconv"
"sync"
"testing"
)
const (
NumOfReader = 10
NumOfWriter = 100
)
type Map interface{
Set(key string, val interface{})
Get(key string) (interface{}, bool)
Del(key string)
}
func benchmarkMap(b *testing.B, hm Map){
for i := 0; i<b.N; i++{
var wg sync.WaitGroup
for j:=0; j< NumOfWriter; j++{
wg.Add(1)
go func() {
for l:=0; l< 100; l++{
hm.Set(strconv.Itoa(i), i*i)
hm.Set(strconv.Itoa(i), i*i)
hm.Del(strconv.Itoa(i))
}
wg.Done()
}()
}
for i:=0; i< NumOfReader; i++{
wg.Add(1)
go func() {
for i:=0; i<100; i++{
hm.Get(strconv.Itoa(i))
}
}()
wg.Done()
}
wg.Wait()
}
}
func BenchmarkSyncMap(b *testing.B) {
b.Run("map with RWLock", func(b *testing.B) {
hm := CreateRWLockMap()
benchmarkMap(b, hm)
})
b.Run("sync.map", func(b *testing.B) {
hm := CreateSyncMap()
benchmarkMap(b, hm)
})
b.Run("concurrent map", func(b *testing.B) {
hm := CreateConcurrentMap()
benchmarkMap(b, hm)
})
}
减少锁的影响范围
减少发⽣锁冲突的概率
• sync.Map
• ConcurrentMap
• 避免锁的使⽤
LAMX Disruptor:https://martinfowler.com/articles/lmax.html
GC
的使用 –GC友好的代码
- 避免内存分配和复制
复杂对象尽量传递引⽤
• 数组的传递
• 结构体传递
package gc
import "testing"
const NumOfElemss = 1000
type Content struct{
Detail [10000] int
}
func withValue(arr [NumOfElemss]Content) int{
return 0
}
func withReference(arr *[NumOfElemss]Content) int {
return 0
}
func TestFn(t *testing.T){
var arr [NumOfElemss]Content
withValue(arr)
withReference(&arr)
}
func BenchmarkPassingArrayWithValue(b *testing.B) {
var arr [NumOfElemss]Content
b.ResetTimer()
for i := 0; i < b.N; i++ {
withValue(arr)
}
b.StopTimer()
}
func BenchmarkPassingArrayWithRef(b *testing.B) {
var arr [NumOfElemss]Content
//
b.ResetTimer()
for i := 0; i < b.N; i++ {
withReference(&arr)
}
b.StopTimer()
})
}
goos: windows
goarch: amd64
pkg: ch47/gc
BenchmarkPassingArrayWithValue-4 100 16580098 ns/op
BenchmarkPassingArrayWithRef/ccc-4 2000000000 0.36 ns/op
打开 GC ⽇志
在程序执⾏之前加上环境变量 GODEBUG=gctrace=1 ,
如:GODEBUG=gctrace=1 go test -bench=.
GODEBUG=gctrace=1 go run main.go
go tool trace
go test -bench=BenchmarkPassingArrayWithRef -trace=trace.out
go tool trace trace.out
普通程序输出 trace 信息
package main
import (
"os"
"runtime/trace"
)
func main() {
f, err := os.Create("trace.out")
if err != nil {
panic(err)
}
defer f.Close()
err = trace.Start(f)
if err != nil {
panic(err)
}
defer trace.Stop()
// Your program here
}
-
初始化
切片
的大小 (避免自动扩容
)⾃动扩容是有代价的
package gc
import "testing"
const NumOfElemss = 100000
func BenchmarkAutoGrow(b *testing.B){
for i:=0; i<b.N;i++{
s := []int{}
for j:=0; j<NumOfElemss; j++{
s = append(s, j)
}
}
}
func BenchmarkProperInit(b *testing.B){
for i:=0; i<b.N; i++{
s := make([]int, 0, NumOfElemss)
for j:=0; j<NumOfElemss; j++{
s = append(s, j)
}
}
}
func BenchmarkOverSize(b *testing.B){
for i:=0; i<b.N; i++{
s := make([]int, 0, 1000)
for j:=0; j<NumOfElemss; j++{
s = append(s, j)
}
}
}
BenchmarkAutoGrow-4 2000 814991 ns/op
BenchmarkProperInit-4 10000 206999 ns/op
BenchmarkOverSize-4 2000 774001 ns/op
高可用的架构设计
字符串拼接的函数对比
string 是不可变类型
每次add都会生成一个新的 string, 需要新的内存
之前的 string 也会 影响 GC
go 1.10+
推荐使用 strings.Buiilder
go 1.10-
推荐使用 bytes.Buffer
来进行字符串(缓存区)的拼接
package stringmethod
import (
"bytes"
"fmt"
"strconv"
"strings"
"testing"
)
const numbers = 100
func BenchmarkSprintf(b *testing.B) {
b.ResetTimer()
for idx := 0; idx < b.N; idx++ {
var s string
for i := 0; i < numbers; i++ {
s = fmt.Sprintf("%v%v", s, i)
}
}
b.StopTimer()
}
func BenchmarkStringBuilder(b *testing.B) {
b.ResetTimer()
for idx := 0; idx < b.N; idx++ {
var builder strings.Builder
for i := 0; i < numbers; i++ {
builder.WriteString(strconv.Itoa(i))
}
_ = builder.String()
}
b.StopTimer()
}
func BenchmarkBytesBuf(b *testing.B) {
b.ResetTimer()
for idx := 0; idx < b.N; idx++ {
var buf bytes.Buffer
for i := 0; i < numbers; i++ {
buf.WriteString(strconv.Itoa(i))
}
_ = buf.String()
}
b.StopTimer()
}
func BenchmarkStringAdd(b *testing.B) {
b.ResetTimer()
for idx := 0; idx < b.N; idx++ {
var s string
for i := 0; i < numbers; i++ {
s += strconv.Itoa(i)
}
}
b.StopTimer()
}
BenchmarkSprintf-4 100000 24400 ns/op
BenchmarkStringBuilder-4 1000000 2066 ns/op
BenchmarkBytesBuf-4 1000000 1825 ns/op
BenchmarkStringAdd-4 200000 9024 ns/op
面向错误的设计 fault oriented
-
隔离错误模式 (
不影响主要的程序运行
)1 设计
micro kenal 模式 插件
2 部署
单机很难做到错误隔离
3 重⽤ vs 隔离
- 冗余
单点失效
限流
增加服务器
使用限流
来避免单点失效
token bucket
每滴水 --> token (请求权限)
滴水频率 --> 产生token的逻辑
- 超时处理 –
避免慢响应
A quick rejection
is better than
a slow response
⾯向恢复的设计 rocovery oriented
避免无效的程序监控
• 注意僵⼫进程
• 池化资源耗尽
• 死锁
-
Let it Crash!
未知错误 如果没有 好的 recover方法, 最好让进程退出
然后 进程监控系统 重启异常进程
reboot
-
如何构建可恢复的系统
• 拒绝单体系统
• ⾯向错误和恢复的设计
• 在依赖服务不可⽤时,可以继续存活 • 快速启动 • ⽆状态
-
与客户端协商
服务器: “我太忙了,请慢点发送数据” Client: “好,我⼀分钟后再发送”
Chaos Engineering 混沌 Engineering
Netflix工程师创建了Chaos Monkey,使用该工具可以在整个系统中在随机位置引发故障。正如GitHub上的工具维护者所说,
“Chaos Monkey会随机终止在生产环境中运行的虚拟机实例和容器。”
通过Chaos Monkey,工程师可以快速了解他们正在构建的服务是否健壮,是否可以弹性扩容,是否可以处理计划外的故障。
采用某种形式的混沌工程 >> 来提高现代架构的可靠性
-
Chaos Engineering 原则
• Build a Hypothesis around Steady State Behavior • Vary Real-world Events • Run Experiments in Production • Automate Experiments to Run Continuously • Minimize Blast Radius