golang中有2种方式同步程序,一种使用channel,另一种使用sync.WaitGroup。最近在使用golang写一个比较简单的功能 —- host1主机需要先在本机起一个TCP监听,起来后给host2主机发送指令,让其主动给host1主机监听的端口进行连接。最终使用了sync.WaitGroup实现了该功能。本篇就结合一些示例来看下两者的使用。

一、channel并行同步

比如有三个需要取数据的程序同时进行,但是终需要同步并返回数据。我们可以按如下代码操作:

 1package main
 2import (
 3    "fmt"
 4    "time"
 5)
 6func main() {
 7    messages := make(chan int)
 8    go func() {
 9        time.Sleep(time.Second * 3)
10        messages <- 1
11    }()
12    go func() {
13        time.Sleep(time.Second * 2)
14        messages <- 2
15    }()
16    go func() {
17        time.Sleep(time.Second * 1)
18        messages <- 3
19    }()
20    go func() {
21        for i := range messages {
22            fmt.Println(i)
23        }
24    }()
25    time.Sleep(time.Second * 5)
26}

最终取回的结果是3 2 1 ,但是如果该代码中如果不加time.sleep 5秒的动作,程序执行时会出现主进程还未等各个进程执行完成就结束了。因为go函数可以简单理论为shell里的&操作。当然遇到这样的问题,使用sync.WaitGroup是可以解决的。但如果不用sync.WaitGroup,还是使用channel去处理能不能解决呢?

当然是可以的,我们可以再创建一个无缓存的channel,由于该channel是阻塞的,在所有的数据未取出前,主程序就不退出。具体做法如下:

 1package main
 2import (
 3    "fmt"
 4    "time"
 5)
 6func main() {
 7    messages := make(chan int)
 8    // Use this channel to follow the execution status
 9    // of our goroutines :D
10    done := make(chan bool)
11    go func() {
12        time.Sleep(time.Second * 3)
13        messages <- 1
14        done <- true
15    }()
16    go func() {
17        time.Sleep(time.Second * 2)
18        messages <- 2
19        done <- true
20    }()
21    go func() {
22        time.Sleep(time.Second * 1)
23        messages <- 3
24        done <- true
25    }()
26    go func() {
27        for i := range messages {
28            fmt.Println(i)
29        }
30    }()
31    for i := 0; i < 3; i++ {
32        <-done
33    }
34}

这里上面每个channel执行完成后,会向done这样一个channel里向true,在true的结果没有取出之前,程序就会一直阻塞,直接所有的程序都完成。可能聪明的同学会觉得不需要这么麻烦,只需要把示例1中的代码最后一个go func()去掉,而且把sleep 5秒也去掉,直接改为如下循环取出就可以:

1for i := range messages {
2    fmt.Println(i)
3}

实际执行的时候呢?看下图:

channel-deadlock
channel-deadlock

二、sync.WaitGroup并行同步处理

sync包提供了基本同步和互持锁。其可以操作的类型有Cond、Locker、Map、Mutex、Once、Pool、RWMutex、WailtGroup。这里只说WaitGroup,WaitGroup提供了三个方法:Add()用来添加计数。Done()用来在操作结束时调用,使计数减一。Wait()用来等待所有的操作结束,即计数变为0,该函数会在计数不为0时等待,在计数为0时立即返回。同样是上面的示例,使用sync.WailtGroup解决比较容易,如下:

 1package main
 2import (
 3    "fmt"
 4    "sync"
 5    "time"
 6)
 7func main() {
 8    messages := make(chan int)
 9    var wg sync.WaitGroup
10    // you can also add these one at
11    // a time if you need to
12    wg.Add(3)
13    go func() {
14        defer wg.Done()
15        time.Sleep(time.Second * 3)
16        messages <- 1
17    }()
18    go func() {
19        defer wg.Done()
20        time.Sleep(time.Second * 2)
21        messages <- 2
22    }()
23    go func() {
24        defer wg.Done()
25        time.Sleep(time.Second * 1)
26        messages <- 3
27    }()
28    go func() {
29        for i := range messages {
30            fmt.Println(i)
31        }
32    }()
33    wg.Wait()
34}

在一个wait组里我们增加了三个计数器,每完成一个减1,直到为0时,wait组结束。其同样适用于多线程采集:

 1package main
 2import (
 3    "fmt"
 4    "io/ioutil"
 5    "log"
 6    "net/http"
 7    "sync"
 8)
 9func main() {
10    urls := []string{
11        "http://api.douban.com/v2/book/isbn/9787218087351",
12        "http://ip.taobao.com/service/getIpInfo.php?ip=202.101.172.35",
13        "https://jsonplaceholder.typicode.com/todos/1",
14    }
15    jsonResponses := make(chan string)
16    var wg sync.WaitGroup
17    wg.Add(len(urls))
18    for _, url := range urls {
19        go func(url string) {
20            defer wg.Done()
21            res, err := http.Get(url)
22            if err != nil {
23                log.Fatal(err)
24            } else {
25                defer res.Body.Close()
26                body, err := ioutil.ReadAll(res.Body)
27                if err != nil {
28                    log.Fatal(err)
29                } else {
30                    jsonResponses <- string(body)
31                }
32            }
33        }(url)
34    }
35    go func() {
36        for response := range jsonResponses {
37            fmt.Println(response)
38        }
39    }()
40    wg.Wait()
41}

上面是采集3个json数据的返回结果。当然也可以参看下官方的示例,官方的示例和这里略有差别,这个是一次通过len增加了n个wait任务,官方的每处理前就先增加一个。

 1package main
 2import (
 3    "sync"
 4)
 5type httpPkg struct{}
 6func (httpPkg) Get(url string) {}
 7var http httpPkg
 8func main() {
 9    var wg sync.WaitGroup
10    var urls = []string{
11        "http://www.golang.org/",
12        "http://www.google.com/",
13        "http://www.somestupidname.com/",
14    }
15    for _, url := range urls {
16        // Increment the WaitGroup counter.
17        wg.Add(1)
18        // Launch a goroutine to fetch the URL.
19        go func(url string) {
20            // Decrement the counter when the goroutine completes.
21            defer wg.Done()
22            // Fetch the URL.
23            http.Get(url)
24        }(url)
25    }
26    // Wait for all HTTP fetches to complete.
27    wg.Wait()
28}

参考页面:how-to-wait-for-all-goroutines-to-finish-without-using-time-sleep