11 September 2014

目录

0. 各个语言的并行编程方案

go语言的其中一个设计目标是适应现代多核cpu,强调并发编程。

为了充分使用多核、实现高并发,有以下一些方案:

  1. C/C++

    • 语言本身没有支持并发,依赖标准库或第三方库。
    • 多进程方案。
    • 多线程方案。C11/C++11标准库都已提供多线程设施。
    • Windows提供APC(异步过程调用),将函数分发给一个内部线程池异步执行。
    • OS X类似Windows,提供GCD(Grand Central Dispatch)机制,也是将函数分发给内部线程池异步执行。Apple公司通过给C语言增加匿名函数block机制,提供了一定便利性
  2. Node.js

    • 采用线程池-异步调用机制。
    • 也可采用多进程方案。
  3. Java

    • 提供线程,但与jvm实现有关。
  4. Scala(也适用其他jvm语言)

    • 旧:Actor
    • 新:Akka
  5. Lua

    • 语言与标准库都不支持并发。
    • 但可采用一虚拟机一线程的方案。

而go/rust等新一代语言,在语法层面已经支持并发,且采用与上述语言不太一样的方案。

go采用了多线程,线程内运行多个协程的方案

  • 使用channel在goroutine之间安全传输数据
  • 使用select语句实现channel多路复用
  • 运用goroutine调度封装OS级别的IO多路复用机制
    • Windows:完成端口
    • OS X/BSD: kqueue
    • Linux: epoll

1. 相关语法

1.1 执行的最小单位:goroutine

在go中,最小的执行单元是goroutine。goroutine的本质是用户态、非抢占式协程。

  • gc的实现是:一个线程运行多个goroutine,一个程序多个线程。(用户态)
  • gcc-go的实现是:每个goroutine对应一个线程。
  • goroutine一旦运行起来必须显式让出控制权才能使同一线程内的其他goroutine获得运行的机会。(非抢占式)。
  • goroutine访问数据建议使用channel,但并未限制多个goroutine访问同一变量,因此仍然存在竞争问题。go还是在sync包中提供了各种锁、原子操作机制。
  • go的垃圾回收一小部分工作可以并行完成。

启动一个goroutine

func foo() {
    ...
}

// 以函数foo作为goroutine的入口
go foo()

// 以匿名函数作为goroutine的入口
go func () {
    ...
} ()

主动让出控制权,触发调度:

import "runtime"
import "time"

func foo(n int) {
    for i := 0; i < n; i++ {
        runtime.Gosched() // 主动让出cpu,触发调度
    }
}

func main() {
    go foo(10000)
    time.Sleep(10 * time.Second)
}

如果主goroutine退出,所有的goroutine都会被结束,没有类似Java的daemon线程的概念。

package main

import "time"
import "fmt"

func main() {
    go func () { // 这个goroutine来不及执行就会被结束
        for i := 0; i < 100; i++ {
            time.Sleep(2 * time.Second)
            fmt.Println("tick")
        }
    } ()
}

环境变量GOMAXPROCS可以指定可以运行go用户级代码的操作系统最大线程数。

1.2 数据的传递通道:channel

channel是引用类型,需要通过make函数创建。

c1 := make(chan int) // 无缓冲的channel,因为默认缓存被设置为0
c2 := make(chan bool, 1) // 有缓冲的channel,并设置缓存为1

channel类似blocking queue,对于写入,当写满缓存后会阻塞;对于读取,但缓存为空时会阻塞。 对于无缓存channel,每次写入都会阻塞,直到读端读取一次数据后才能继续写入。

可以显式指定channel的方向来提高类型安全性:

func ping(pings chan<- string, msg string) {
    // pings只写
}
func pong(pings <-chan string, pongs chan<- string) {
    // pings只读,pongs只写
}

channel使用后需要用close函数显式关闭。

  • 如果尝试关闭已经关闭后的channel会导致panic: runtime error: close of closed channel
  • 如果读取已经关闭的channel会读到空数据:bool类型为false,整数类型为0,等等。
  • 如果尝试写入已经关闭的channel会导致panic: runtime error: send on closed channel

如何判断channel已经关闭?

val, isOpen := <- ch
if isOpen {
    // channel还未关闭
} else {
    // channel已经关闭
}

循环读取channel:

for val := range ch { // 读到channel close会结束循环
    // ...
}

1.2.1 go标准库对channel的运用

go标准库广泛运用了channel,对一些可能需要用到回调函数的地方,用channel代替。

time包

func After(d Duration) <-chan Time 返回一个channel,并在指定的时间后写入这个channel;只写入一次。

func Tick(d Duration) <-chan Time 返回一个channel,每隔指定的时间写入一次这个channel;用户不需要关闭这个channel。

os/signal包

func Notify(c chan<- os.Signal, sig ...os.Signal) 当信号sig...到来时,写入channel c。

func Stop(c chan<- os.Signal) 取消Notify设置的信号。

1.3 channel多路复用

运用select语句可以实现channel的多路复用。

case从句的应用:

  • case <- channel: 只要channel有数据就触发,不关心具体的数据是啥。
  • case val := <- channel: 如果channel有数据就触发,将数据保存到val中。
  • case channel <- val: 如果channel可写就触发,并将变量val写入channel中。
  • default: 所有case从句关注的channel都不可读、不可写时触发,可实现channel的非阻塞模式。

1.3.1 channel的读写超时设置

设置读超时:

import (
    "fmt"
    "time"
)
...

select {
case <- time.After(time.Second*2):
    fmt.Println("timeout")
case i := <- ch:
    fmt.Println(i)
}

设置写超时:

import (
    "fmt"
    "time"
)
...

select {
case <- time.After(time.Second*2):
    fmt.Println("timeout")
case ch <- "hello":
    fmt.Println("ok")
}

1.4 goroutine与网络IO

goroutine与网络IO有很强的联系,go利用goroutine和操作系统提供的各种多路复用机制(在gc的实现中称作:netpoll),实现简单、高效网络IO编程。

一个echo server的例子:

package main

import (
    "log"
    "net"
)

func main() {
    log.SetFlags(log.LstdFlags | log.Lshortfile)

    listener, err := net.Listen("tcp", ":9999")
    if err != nil {
        log.Fatal(err)
    }

    for {
        conn, err := listener.Accept()
        if err != nil {
            log.Println(err)
            continue
        }

        // 为每个连接启动一个goroutine
        // 表面上看与每连接每线程模式没有区别
        go echoServe(conn)
    }
}

func echoServe(conn net.Conn) {
    var buf [1024]byte

    for {
        // 编程方式与阻塞IO没有区别
        rv, err := conn.Read(buf[:])
        if err != nil {
            log.Println(err)
            break
        }
        rv, err = conn.Write(buf[:rv])
        if err != nil {
            log.Println(err)
            continue
        }
    }
}

表面上看,类似每连接每线程,阻塞IO模式;

实际上当调用Read/Write时,会触发调度;调度器会寻找一个可读\可写\空闲的goroutine来调度,因此,网络读\写会阻塞goroutine但不会阻塞操作系统线程,因为调度器总能找到一个可调度的goroutine。

关于调度和网络IO的细节,将会在后文详细描述。

2. 实现

上文已经粗略介绍了go语言调度相关的语法,现在开始介绍调度相关的实现细节。

2.1 结构体、变量说明

与调度相关的主要代码在pkg/runtime/proc.c

  • 结构体:
    • G - goroutine
    • M - machine 代表操作系统线程
    • P - processor,M需要关联到一个处理器,才能被执行;runnable G保存在其中。
    • Sched - 调度器
  • 全局变量:
    • Sched runtime·sched - 全局调度器
    • M runtime·m0 - 空闲的machine,在rt0阶段用于填充m
    • G runtime·g0 - m0的空闲g,在rt0阶段用于填充g
    • runtime·gomaxprocs - 最大系统线程数量
    • runtime·allp - 所有P的数组,P的数量与runtime·gomaxprocs相等
  • 特殊变量:
    • m - 当前machine指针
    • g - 当前goroutine指针

src/pkg/runtime/runtime.h中这一段代码:

/*
 * Per-CPU declaration.
 *
 * "extern register" is a special storage class implemented by 6c, 8c, etc.
 * On the ARM, it is an actual register; elsewhere it is a slot in thread-
 * local storage indexed by a segment register. See zasmhdr in
 * src/cmd/dist/buildruntime.c for details, and be aware that the linker may
 * make further OS-specific changes to the compiler's output. For example,
 * 6l/linux rewrites 0(GS) as -16(FS).
 *
 * Every C file linked into a Go program must include runtime.h so that the
 * C compiler (6c, 8c, etc.) knows to avoid other uses of these dedicated
 * registers. The Go compiler (6g, 8g, etc.) knows to avoid them.
 */
extern  register    G*  g;
extern  register    M*  m;

注释中提到:register是一种被6c\8c支持的特殊存储类型,具体实现是将数据保存在线程本地存储(TLS)的槽中,将索引保存在段寄存器中;arm是例外,数据直接被保存在真实寄存器中。

在drawn_amd64中,g保存在0(GS),m保存在8(GS)

2.2 运行状态

goroutine的运行状态(src/pkg/runtime/runtime.h):

  • Gidle - 初始状态
  • Grunnable - 可运行,调度器会选在此状态的goroutine运行。
  • Grunning - 运行中,goroutine拥有控制权,正在运行中。
  • Gsyscall - 在调用syscall之前被设置,调用结束后会恢复为Grunnable
  • Gwaiting - 对goroutine执行park时被设置,结束后会恢复为Grunnable
  • Gdead - 已死亡,goexit0会将退出的goroutine设置为这个状态。

goroutine状态转移:

goroutine state

goroutine park: 暂停goroutine的执行,channel或IO操作都会导致goroutine park。

processor的运行状态(src/pkg/runtime/runtime.h):

  • Pidle
  • Prunning
  • Psyscall
  • Pgcstop
  • Pdead

2.3 自举过程

go类似c/c++/java,都是从一个固定的函数开始执行。在go中,这个函数是main.main

在执行main.main之前按照go语法,如果在一个包中定义了init函数,会先执行一次这个函数。

在src/pkg/runtime/proc.c中声明了这两个函数:

 extern void main·init(void);
 extern void main·main(void);

启动过程类似c/c++,从rt0开始;源代码在src/pkg/runtime/rt0_GOOS_GOARCH.s中以汇编语言编写。 对应不同的CPU、OS都有不同的实现。

rt0_GOOS_GOARCH.s各个CPU/OS实现只是略有不同,大体逻辑是相同的:

  1. 将argv,argc分别放入寄存器中,(amd64是:DI,SI)
  2. jump到main函数入口
  3. jump到_rt0_go函数入口

_rt0_go才是rt0的主要部分,这个函数定义在src/pkg/runtime/asm_GOARCH.s中。 amd64下,_rt0_go的流程是:

  1. 将argv,argc从寄存器中拷贝到栈中。
  2. 初始化runtime·g0的栈。
  3. 获取CPUID,如果能获取的话。
  4. 初始化tls(线程本地存储)。
  5. 将g0,m0设置为g,m,填充初始的g,m;此处g,m指针仅仅用于填充g0,m0。
  6. runtime·args:负责处理命令行参数,argv,argc在第1.步中已经保存在栈中,调用runtime.args时被当作参数使用。它只是简单的把这两个参数保存在全局的argv,argc中。
  7. runtime·osinit:实现与操作系统相关;但都会实现获取cpu核数,并设置runtime·ncpu字段。
  8. runtime·hashinit:跟AES算法相关,根据cpuid设置hash算法。
  9. runtime·schedinit:初始化调度器:这里做了一些与调度器初始化无关的工作;省略一些不重要操作后,执行过程大概是:
    1. systabinit - 初始化符号表?
    2. mallocinit - 初始化堆,内存分配。
    3. goargs - 初始化命令行参数:使用args函数中保存的argv和argc初始化os·Args对象。
    4. goenvs - 初始化环境变量:使用argv的后半部分初始syscall·envs对象。
    5. 根据GOMAXPROCS环境变量设置最大线程数。
    6. 根据GOCOPYSTACK环境变量设置runtime·copystack变量。
  10. 调用runtime·newproc(runtime·main,nil)创建一个新的G。
  11. 调用runtime·mstart,启动m。

按proc.c注释中提到,自举大体过程是:

  1. 调用osinit
  2. 调用schedinit
  3. 创建一个新的G
  4. 调用mstart启动G

新的G调用main,即go语言代码中入口函数。

main·main的调用栈应该是:

rt0 call graph

2.3 调度算法

调度算法的设计文档位于这里

最新的调度算法基于工作窃取(work-stealing)算法

2.3.1 处理器(Processor)

注意:go的processor不是指物理的CPU。

go在runtime中引入了结构体P,用于实现工作窃取算法。

结构体M表示操作系统线程,P表示一种可以用于执行go代码的资源。如果M要执行go代码,它必须关联一个P。当M空闲或者处于syscall状态时,也需要P。

P的数量精确的等于GOMAXPROCS,由于工作窃取算法的需要,所有的P被组织成数组。如果GOMAXPROCS数量在运行中改变了,需要完全停止程序运行(stop-world)并重新设置P数组的大小。

P *allp; // [GOMAXPROCS] P数组

无锁(lock-free)的空闲P链表:

P *idlep; // lock-free list

当M将要开始执行代码时,必须从链表中弹出P;当M结束执行代码后,需要将P压入链表。也就是说当M执行代码时必要要关联一个P。

2.3.2 调度

当一个G被创建,或者变成Grunnable状态,它会被压入当前P的runnable goroutine链表。当P结束执行G,先尝试从自己的runnable goroutine链表弹出一个G,如果这个链表为空,P随机选择一个牺牲者(其他的P),从中偷取1/2的runnable goroutine。

2.3.2 Syscalls/M Parking and Unparking

当一个M创建了一个新的G,必须确保有其他的M执行这个G(如果不是所有的M都在忙);同样的,当一个M进入syscall, 必须确保有其他的M执行go代码。

这里有两个办法:及时阻塞/取消阻塞M,或者利用某种自旋机制(忙等待)。这是性能和消耗不必要CPU周期的矛盾。采用自旋消耗CPU周期的办法,不管怎样,这在GOMAXPROCS=1的时候是没有效果的。

自旋分为两隔阶段:

阶段(1):与空闲M关联自旋以寻找新的G。

阶段(2):

当一个G被生成/M进入syscall/M从空闲转换到忙,必须确保至少1个M自旋(或者所有的P都忙);这样就确保了不存在可运行的G以其他方式运行。并且同时避免了过度的阻塞\取消阻塞M。

自旋基本是消极的(使用操作系统yield),但可以包含一小部分主动自旋(空转消耗CPU周期)。

2.3.3 总结

P-M-G的关系图:

P-M-G

2.4 调度实现

goroutine本质上是一个用户态协程,需要主动让出控制权以使其他goroutine获取执行时间。 让出控制权有以下几种方式:

  • 显式调用func runtime.Gosched()
  • 分配内存
  • 间接或直接调用syscall
  • 使用chan

因此,在执行计算密集型goroutine时,注意在适当的时机触发调度,使同一线程内的其他goroutine获得执行机会。

2.4.1 P-M-G的创建

G的创建

runtime·newproc(src/pkg/runtime/proc.c)负责创建G。

runtime·newproc在内部调用runtime·newproc1runtime·newproc1包含了绝大部分创建G的代码:

  1. 检查栈大小。
  2. 利用内存池机制,先检查当前P是否有已释放的G(m->p->gfree),如果有,直接使用。
  3. 如果内存池内没有已释放的G,则调用runtime·malg分配一个。
  4. 将新G设置为Grunnable状态,并调用runqput将其加入当前P的Runnable队列。
  5. 若果有P处于空闲状态、没有M自旋、G的入口函数不是runtime·main的时候(此时过于空闲了),调用wakeup唤醒一个P进行调度。
  6. 返回新的G。

由此可见,新创建的goroutine并不一定先执行,这就是go1.1以后引入的“抢占式进入”机制。

M的创建

M的创建比较复杂,它是按需创建的,也就是说worker线程是按需创建的。

newm(src/pkg/runtime/proc.c)负责创建M,并启动一个操作系统线程:

  1. 调用runtime·allocm分配一个M对象。
  2. 调用runtime·newosprocruntime·mstart为入口启动线程。

runtime·mstart(src/pkg/runtime/proc.c)为worker线程的真正执行体,其中先进行一些初始化工作,最后调用schedule进行调度。

在主G(入口函数为:runtime·main)中会调用newm,创建一个以sysmon为入口的worker线程。这个线程为系统级的daemon线程。sysmon中以死循环的方式执行以下操作:

  1. 以非阻塞的方式调用runtime·netpoll,从中找出能从网络IO中唤醒的G,并调用injectglist,将其插入调度器的runnable列表中(全局),调度触发时,有可能从这个全局runnable列表获取G。然后再循环调用startm,直到所有P都不处于Pidle状态。
  2. 调用retake,抢占长时间处于Psyscall状态的P。

startm(src/pkg/runtime/proc.c)调度一个M来执行P

2.4.2 Gosched的实现

func runtime.Gosched()的调用图:

gosched call graph

runtime·mcall(fn):

间接调用函数fn, 切换到m->g0的栈,并调用fn(g),fn必须永不返回,而是应该调用gogo(&g->sched)切换控制权,保障g继续运行。

runtime.gosched0:

此时由于使用runtime·macll间接调用了gosched0,所以此时已经切换到m(当前M)的上下文了。

将g状态设置为Grunnable。设置g->m = nil,设置m->curg = nil。

然后将需要被调度的goroutine放入全局调度器的运行队列中;调度器runtime·sched保存了一个全局的处于runnable状态的goruntine列表;注意:此操作会导致runtime·sched加锁。

判断m->lockedg是否为空,如果为空调度其他g而不是新创建的g,否则执行新创建的g

由此函数的实现可以发现,新创建的goroutine并不一定被先调度,这是go-1.1的修改“入口goroutine抢占式调度”。

goroutine本身的调度依然不是抢占式的,但goroutine入口是抢占式进入的。

schedule:

这是调度算法的核心,负责选择合适G执行。

先处理gc过程,如果gc处于stop world阶段,会先执行gc,直到stop world结束。

后面才开始选择G执行:

  1. 从全局调度器runtime·sched的runnable列表中选择。
  2. 从当前M的当前P(m->p)的runnable列表中选择。
  3. 从netpoll唤醒的goroutine中选择一个,非阻塞。
  4. 最多重试2*runtime·gomaxprocs次,工作偷取算法,从allp中随机选择一个P,如果正好是当前P重复步骤2,否则从选中的P中偷取一半runnable G到当前P中;并选择一个G。
  5. 最多重试2*runtime·gomaxprocs次,从全局调度器runtime·sched的idle P列表选择一个空闲的P,然后从P的runnable列表中选择。
  6. 从netpoll唤醒的goroutine中选择一个,阻塞。
  7. 重复3~6,直到找到一个可调度的G。
  8. 调用execute执行被选中的G。

schedule的实现已可以解释1.4节出现的echo server执行原理。

findrunnable:

获取一个可执行的G,这是阻塞操作,会阻塞当前线程,直到找到一个可调度的G。

execute:

将g状态设置为Grunning。设置g->m = m,设置m->curg = g。

调度goroutine运行在当前线程,他永远不会返回,因为其在内部调用runtime.gogo将控制权转交到goroutine了。

struct Gobuf - goroutine运行上下文,最重要的字段

uintptr sp; 栈指针
unitptr pc; 程序计数器

gogo函数负责恢复现场,并转移控制权(src/pkg/runtime/asm_amd64.s)

// void gogo(Gobuf*)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $0-8
    MOVQ    8(SP), BX       // gobuf
    MOVQ    gobuf_g(BX), DX
    MOVQ    0(DX), CX       // make sure g != nil
    get_tls(CX)
    MOVQ    DX, g(CX)
    MOVQ    gobuf_sp(BX), SP    // restore SP
    MOVQ    gobuf_ret(BX), AX
    MOVQ    gobuf_ctxt(BX), DX
    MOVQ    $0, gobuf_sp(BX)    // clear to help garbage collector
    MOVQ    $0, gobuf_ret(BX)
    MOVQ    $0, gobuf_ctxt(BX)
    MOVQ    gobuf_pc(BX), BX
    JMP BX

这段代码具体工作就是:

  • 从Gobuf中恢复g、sp、ret、ctxt四个字段。
  • 跳转到pc,完成控制权转移。


blog comments powered by Disqus