微信号:flysnow_org

介绍:打杂师,观察家.

一起用golang之Go程序的套路

2017-08-27 11:13 山楂大卷

系统性地介绍golang基础的资料实在太多了,这里不再一一赘述。本文的思路是从另一个角度来由浅入深地探究下Go程序的套路。毕竟纸上得来终觉浅,所以,能动手就不要动口。有时候几天不写代码,突然间有一天投入进来做个东西,才恍然发觉,也只有敲代码的时候,才能找回迷失的自己,那可以忘掉一切的不开心。

Hello world

package main

import (
    "fmt"
)

func main() {
    fmt.Println("hello world")
}

go程序结构从整体上来说就是这样的,第一行看起来这一定就是包头声明了,程序以包为单位,一个文件夹是一个包,一个包下可能有多个文件,但是包名都是同一个。相对C/C++程序的include来说,这里是import,后面跟的就是别的包名,一个包里定义的变量或类型,本包内都可见,若首字母大小,则可以被导出。如果引入了程序里不使用的包,编译会报错,报错,错。声明不使用的变量也一样,对,会报错。这里行尾没有分号,左大括号必须那样放,缩进也不用你操心等等,编码风格中的很多问题在这里都不再是问题,是的,go fmt帮你都搞定了,所以你看绝大部分go程序风格都好接近的。写一段时间代码后,你会发现,这种风格确实简单,干净利落。

本文重点

通过一些概念的学习和介绍,设计并实现个线程池,相信很多地方都可能用到这种模型或各种变形。

变量

变量的声明、定义、赋值、指针等不想啰嗦了,去别的地方学吧。

结构体

我们先来定义一个结构体吧

package package1
type User struct {    Name string    addr int    age  int
}

你一定注意到了,Name首字母是大写的,在package2包中,import package1后就可以通过user.Name访问Name成员了,Name是被导出的。但addr和age在package2中就不能直接访问了,这俩没有被导出,只能在package1包中被直接访问,也就是私有的。那如何在package2中获取没有被导出的成员呢?我们来看下方法。

方法

func (u User) GetAge() string {
    return u.age
}

func(u *User) SetAge(age int){
    u.age = age
}

方法的使用和C++或者Java都很像的。下面代码段中user的类型是*User,你会发现,无论方法的接收者是对象还是指针,方法调用时都只用.,而代表指针的->已经不在了。

user := &User{
        Name: name,
        addr: addr,
        age:  age,
}
user.SetAge(100)
fmt.Println(user.GetAge())

还有常用的构造对象的方式是这样的

func NewUser(name string, addr string, age int) *User {
    return &User{
        Name: name,
        addr: addr,
        age:  age,
    }
}
    user := new(User)
    user := &User{}//与前者等价
    user := User{}

组合与嵌套

Go中没有继承,没有了多态,也没有了模板。争论已久的继承与组合问题,在这里也不是问题了,因为已经没得选择了。比如我想实现个线程安全的整型(假设只用++和--),可能这么来做

type safepending struct {
    pending int
    mutex   sync.RWMutex
}

func (s *safepending) Inc() {
    s.mutex.Lock()
    s.pending++
    s.mutex.Unlock()
}

func (s *safepending) Dec() {
    s.mutex.Lock()
    s.pending--
    s.mutex.Unlock()
}

func (s *safepending) Get() int {
    s.mutex.RLock()
    n := s.pending
    s.mutex.RUnlock()
    return n
}

也可以用嵌套写法

type safepending struct {
    pending int
    sync.RWMutex
}

func (s *safepending) Inc() {
    s.Lock()
    s.pending++
    s.Unlock()
}

func (s *safepending) Dec() {
    s.Lock()
    s.pending--
    s.Unlock()
}

func (s *safepending) Get() int {
    s.RLock()
    n := s.pending
    s.RUnlock()
    return n
}

这样safepending类型将直接拥有sync.RWMutex类型中的所有属性,好方便的写法。

interface

一个interface类型就是一个方法集,如果其他类型实现了interface类型中所有的接口,那我们就可以说这个类型实现了interface类型。举个例子:空接口interface{}包含的方法集是空,也就可以说任何类型都实现了它,也就是说interface{}可以代表任何类型,类型直接的转换看下边的例子吧。

实现一个小顶堆

首先定义一个worker结构体, worker对象中存放很多待处理的request,pinding代表待处理的request数量,以worker为元素,实现一个小顶堆,每次Pop操作都返回负载最低的一个worker。
golang标准库中提供了heap结构的容器,我们仅需要实现几个方法,就可以实现一个堆类型的数据结构了,使用时只需要调用标准库中提供的Init初始化接口、Pop接口、Push接口,就可以得到我们想要的结果。我们要实现的方法有Len、Less、Swap、Push、Pop,请看下边具体代码。另外值得一提的是,山楂君也是通过标准库中提供的例子学习到的这个知识点。

type Request struct {
    fn    func() int
    data  []byte
    op    int
    c     chan int
}

type Worker struct {
    req     chan Request
    pending int
    index   int
    done    chan struct{}
}

type Pool []*Worker

func (p Pool) Len() int {
    return len(p)
}
func (p Pool) Less(i, j int) bool {
    return p[i].pending < p[j].pending
}

func (p Pool) Swap(i, j int) {
    p[i], p[j] = p[j], p[i]
    p[i].index = i
    p[j].index = j
}

func (p *Pool) Push(x interface{}) {
    n := len(*p)
    item := x.(*Worker)
    item.index = n
    *p = append(*p, item)
}

func (p *Pool) Pop() interface{} {
    old := *p
    n := len(*p)
    item := old[n-1]
    //item.index = -1
    *p = old[:n-1]
    return item
}

pool的使用

package main

import (
    "container/heap"
    "log"
    "math/rand"
)

var (
    MaxWorks = 10000
    MaxQueue = 1000
)

func main() {
    pool := new(Pool)
    for i := 0; i < 4; i++ {
        work := &Worker{
            req:     make(chan Request, MaxQueue),
            pending: rand.Intn(100),
            index:   i,
        }
        log.Println("pengding", work.pending, "i", i)
        heap.Push(pool, work)
    }

    heap.Init(pool)
    log.Println("init heap success")
    work := &Worker{
        req:     make(chan Request, MaxQueue),
        pending: 50,
        index:   4,
    }
    heap.Push(pool, work)
    log.Println("Push worker: pending", work.pending)
    for pool.Len() > 0 {
        worker := heap.Pop(pool).(*Worker)
        log.Println("Pop worker:index", worker.index, "pending", worker.pending)
    }
}

程序的运行结果如下,可以看到每次Pop的结果都返回一个pending值最小的一个work元素。

2017/03/11 12:46:59 pengding 81 i 0
2017/03/11 12:46:59 pengding 87 i 1
2017/03/11 12:46:59 pengding 47 i 2
2017/03/11 12:46:59 pengding 59 i 3
2017/03/11 12:46:59 init heap success
2017/03/11 12:46:59 Push worker: pending 50
2017/03/11 12:46:59 Pop worker:index 4 pending 47
2017/03/11 12:46:59 Pop worker:index 3 pending 50
2017/03/11 12:46:59 Pop worker:index 2 pending 59
2017/03/11 12:46:59 Pop worker:index 1 pending 81
2017/03/11 12:46:59 Pop worker:index 0 pending 87

细心的你肯能会发现,不是work么,怎么没有goroutine去跑任务?是的山楂君这里仅是演示了小顶堆的构建与使用,至于如何用goroutine去跑任务,自己先思考一下吧。
其实加上类似于下边这样的代码就可以了

func (w *Worker) Stop() {
    w.done <- struct{}{}
}

func (w *Worker) Run() {
    go func() {
        for {
            select {
            case req := <-w.req:
                req.c <- req.fn()
            case <-w.done:
                break
            }
        }
    }()
}

golang的并发

golang中的并发机制很简单,掌握好goroutine、channel以及某些程序设计套路,就能用的很好。当然,并发程序设计中存在的一切问题与语言无关,只是每种语言中基础设施对此支持的程度不一,Go程序中同样都要小心。

goroutine

官方对goroutine的描述:

They're called goroutines because the existing terms—threads, coroutines, processes, and so on—convey inaccurate connotations. A goroutine has a simple model: it is a function executing concurrently with other goroutines in the same address space. It is lightweight, costing little more than the allocation of stack space. And the stacks start small, so they are cheap, and grow by allocating (and freeing) heap storage as required.
Goroutines are multiplexed onto multiple OS threads so if one should block, such as while waiting for I/O, others continue to run. Their design hides many of the complexities of thread creation and management.
Prefix a function or method call with the go keyword to run the call in a new goroutine. When the call completes, the goroutine exits, silently. (The effect is similar to the Unix shell's & notation for running a command in the background.)

启动一个goroutine,用法很简单:

go DoSomething()

channel

看channel的描述:

A channel provides a mechanism for concurrently executing functions to communicate by sending and receiving values of a specified element type. The value of an uninitialized channel is nil.

简而言之,就是提供了goroutine之间的同步与通信机制。

共享内存?OR 通信?

Don't communicate by sharing memory; share memory by communicating

这就是Go程序中很重要的一种程序套路。拿一个具体的小应用场景来说吧:一个Map类型的数据结构,其增删改查操作可能在多个线程中进行,我们会用什么样的方案来实现呢?

  1. 增删改查操作时加锁

  2. 实现一个线程安全的Map类型

  3. 增删改查操作限定在线程T中,其他线程如果想进行增删改操作,统一发消息给线程T,由线程T来进行增删操作(假设其他线程没有Map的查询操作)

对于方案3其实就是对Go程序这种套路的小应用,这种思想当然和语言无关,但是在Go语言中通过“通信”来共享内存的思路非常容易实现,有原生支持的goroutine、channel、select、gc等基础设施,也许你会有"大消息"传递场景下的性能顾虑,但channel是支持引用类型的传递的,且会自动帮你进行垃圾回收,一个大结构体的引用类型实际上可能才占用了十几个字节的空间。这实在是省去了山楂君很多的功夫。看Go程序的具体做法:

type job struct {
    // something
}

type jobPair struct {
    key   string
    value *job
}

type worker struct {
    jobqueue map[string]*job // key:UserName
    jobadd   chan *jobPair
}

// 并不是真正的map insert操作,仅发消息给另外一个线程
func (w *worker) PushJob(user string, job *job) {
    pair := &jobPair{
        key:   user,
        value: job,
    }
    w.jobadd <- pair
}

// 并不是真正的map delete操作,仅发消息给另外一个线程
func (w *worker) RemoveJob(user string) {
    w.jobdel <- user
}

func (w *worker) Run() {
    go func() {
        for {
            select {
            case jobpair := <-w.jobadd:
                w.insertJob(jobpair.key, jobpair.value)
            case delkey := <-w.jobdel:
                w.deleteJob(delkey)
            //case other channel
            //     for _, job := range w.jobqueue {
                    // do something use job
            //        log.Println(job)
            //    }
            }
        }
    }()
}
func (w *worker) insertJob(key string, value *job) error {
    w.jobqueue[key] = value
    w.pending.Inc()
    return nil
}

func (w *worker) deleteJob(key string) {
    delete(w.jobqueue, key)
    w.pending.Dec()
}

线程池

模型详见下边流程图


线程池模型.png

由具体业务的生产者线程生成一个个不同的job,通过共同的Balance均衡器,将job分配到不同的worker去处理,每个worker占用一个goroutine。在job数量巨多的场景下,这种模型要远远优于一个job占用一个goroutine的模型。并且可以根据不同的业务特点以及硬件配置,配置不同的worker数量以及每个worker可以处理的job数量。

我们可以先定义个job结构体,根据业务不同,里边会包含不同的属性。

type job struct {
    conn     net.Conn
    opcode   int
    data     []byte
    result     chan ResultType //可能需要返回处理结果给其他channel
}
type jobPair struct {
    key   string
    value *job
}

然后看下worker定义

type worker struct {
    jobqueue  map[string]*job // key:UserName
    broadcast chan DataType
    jobadd    chan *jobPair
    jobdel    chan string
    pending   safepending
    index     int
    done      chan struct{}
}

func NewWorker(idx int, queue_limit int, source_limit int, jobreq_limit int) *worker {
    return &worker{
        jobqueue:  make(map[string]*job, queue_limit),
        broadcast: make(chan DataType, source_limit), //4家交易所
        jobadd:    make(chan jobPair, jobreq_limit),
        jobdel:    make(chan string, jobreq_limit),
        pending:   safepending{0, sync.RWMutex{}},
        index:     idx,
        done:      make(chan struct{}),
    }
}

func (w *worker) PushJob(user string, job *job) {
    pair := jobPair{
        key:   user,
        value: job,
    }
    w.jobadd <- pair
}

func (w *worker) RemoveJob(user string) {
    w.jobdel <- user
}

func (w *worker) Run(wg *sync.WaitGroup) {
    wg.Add(1)
    go func() {
        log.Println("new goroutine, worker index:", w.index)
        defer wg.Done()
        ticker := time.NewTicker(time.Second * 60)
        for {
            select {
            case data := <-w.broadcast:
                for _, job := range w.jobqueue {
                    log.Println(job, data)
                }
            case jobpair := <-w.jobadd:
                w.insertJob(jobpair.key, jobpair.value)
            case delkey := <-w.jobdel:
                w.deleteJob(delkey)
            case <-ticker.C:
                w.loadInfo()
            case <-w.done:
                log.Println("worker", w.index, "exit")
                break
            }
        }
    }()
}

func (w *worker) Stop() {
    go func() {
        w.done <- struct{}{}
    }()
}
func (w *worker) insertJob(key string, value *job) error {
    w.jobqueue[key] = value
    w.pending.Inc()
    return nil
}

func (w *worker) deleteJob(key string) {
    delete(w.jobqueue, key)
    w.pending.Dec()
}

结合上边提到的小顶堆的实现,我们就可以实现一个带负载均衡的线程池了。
一种模式并不能应用于所有的业务场景,山楂君觉得重要的是针对不同的业务场景去设计或优化编程模型的能力,以上有不妥之处,欢迎吐槽或指正,喜欢也可以打赏。

参考文献

  1. https://blog.golang.org/share-memory-by-communicating

  2. https://golang.org/doc/effective_go.html

  3. http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/



作者:山楂大卷
链接:http://www.jianshu.com/p/215510810c59
來源:简书


 
飞雪无情 更多文章 一路有你,风雨同行,我的结婚纪念日 Go语言经典库使用分析(五)| Negroni 中间件(一) Go语言 | Go 1.9 新特性 Type Alias详解 工具 | 常用工具镜像网站又更新了(Golang、VsCode、Android Studio等)& Go语言经典库使用分析(四)| Gorilla Handlers 源代码实现分析
猜您喜欢 深度剖析10万条收费站数据,图解交通数据之收费站(统计分析篇) Ceph管理节点故障mds迁移到存储节点 使用 BitMask 写出高效的程序 拥抱 Android Studio 之五:Gradle 插件开发 上海Spark Meetup第十一次聚会