Go语言中常见的并发模式( 三 )

下面的例子中,有两个订阅者分别订阅了全部主题和含有"golang"的主题:
import "path/to/pubsub"func main() { p := pubsub.NewPublisher(100*time.Millisecond, 10) defer p.Close() all := p.Subscribe() golang := p.SubscribeTopic(func(v interface{}) bool { if s, ok := v.(string); ok { return strings.Contains(s, "golang") } return false }) p.Publish("hello, world!") p.Publish("hello, golang!") go func() { for msg := range all { fmt.Println("all:", msg) } } () go func() { for msg := range golang { fmt.Println("golang:", msg) } } () // 运行一定时间后退出 time.Sleep(3 * time.Second)}在发布/订阅模型中,每条消息都会传送给多个订阅者 。发布者通常不会知道,也不关心哪一个订阅者正在接收主题消息 。订阅者和发布者可以在运行时动态添加,它们之间是一种松散的耦合关系,这使得系统的复杂性可以随时间的推移而增长 。在现实生活中,像天气预报之类的应用就可以应用这种并发模式 。
1.6.4 控制并发数很多用户在适应了Go语言强大的并发特性之后,都倾向于编写最大并发的程序,因为这样似乎可以提供最高的性能 。在现实中我们行色匆匆,但有时却需要我们放慢脚步享受生活,并发的程序也是一样:有时候我们需要适当地控制并发的程度,因为这样不仅可给其他的应用/任务让出/预留一定的CPU资源,也可以适当降低功耗缓解电池的压力 。
在Go语言自带的godoc程序实现中有一个vfs的包对应虚拟的文件系统,在vfs包下面有一个gatefs的子包,gatefs子包的目的就是为了控制访问该虚拟文件系统的最大并发数 。gatefs包的应用很简单:
import ( "golang.org/x/tools/godoc/vfs" "golang.org/x/tools/godoc/vfs/gatefs")func main() { fs := gatefs.New(vfs.OS("/path"), make(chan bool, 8)) // ...}其中vfs.OS("/path")基于本地文件系统构造一个虚拟的文件系统,然后gatefs.New基于现有的虚拟文件系统构造一个并发受控的虚拟文件系统 。并发数控制的原理在1.5节已经讲过,就是通过带缓存通道的发送和接收规则来实现最大并发阻塞:
var limit = make(chan int, 3)func main() { for _, w := range work { go func() { limit <- 1 w() <-limit }() } select{}}不过gatefs对此做一个抽象类型gate,增加了enter()和leave()方法分别对应并发代码的进入和离开 。当超出并发数目限制的时候,enter()方法会阻塞直到并发数降下来为止 。
type gate chan boolfunc (g gate) enter() { g <- true }func (g gate) leave() { <-g }gatefs包装的新的虚拟文件系统就是将需要控制并发的方法增加了对enter()和leave()的调用而已:
type gatefs struct { fs vfs.FileSystem gate}func (fs gatefs) Lstat(p string) (os.FileInfo, error) { fs.enter() defer fs.leave() return fs.fs.Lstat(p)}我们不仅可以控制最大的并发数目,而且可以通过带缓存通道的使用量和最大容量比例来判断程序运行的并发率 。当通道为空时可以认为是空闲状态,当通道满了时可以认为是繁忙状态,这对于后台一些低级任务的运行是有参考价值的 。
1.6.5 赢者为王采用并发编程的动机有很多:并发编程可以简化问题,例如一类问题对应一个处理线程会更简单;并发编程还可以提升性能,在一个多核CPU上开两个线程一般会比开一个线程快一些 。其实对提升性能而言,并不是程序运行速度快就表示用户体验好,很多时候程序能快速响应用户请求才是最重要的,当没有用户请求需要处理的时候才合适处理一些低优先级的后台任务 。
假设我们想快速地搜索“golang”相关的主题,我们可能会同时打开必应、谷歌或百度等多个检索引擎 。当某个搜索最先返回结果后,就可以关闭其他搜索页面了 。因为受网络环境和搜索引擎算法的影响,某些搜索引擎可能很快返回搜索结果,某些搜索引擎也可能等到他们公司倒闭也没有完成搜索 。我们可以采用类似的策略来编写这个程序:
func main() { ch := make(chan string, 32) go func() { ch <- searchByBing("golang") }() go func() { ch <- searchBygoogle("golang") }() go func() { ch <- searchByBaidu("golang") }() fmt.Println(<-ch)}首先,创建了一个带缓存通道,通道的缓存数目要足够大,保证不会因为缓存的容量引起不必要的阻塞 。然后开启了多个后台线程,分别向不同的搜索引擎提交搜索请求 。当任意一个搜索引擎最先有结果之后,都会马上将结果发到通道中(因为通道带了足够的缓存,这个过程不会阻塞) 。但是最终只从通道取第一个结果,也就是最先返回的结果 。


推荐阅读