并发

教程地址:点击去往视频教程open in new window

1. 什么是并发?

先来理解CPU时间片的概念

image-20240314220526478

  • CPU同一时刻只能执行一个任务,每个任务执行时会被分配一个时间段,称之为时间片
  • CPU在不同的时间片执行不同的任务(线程/进程),会在任务之间进行切换,切换时保留上一个任务的状态,再次切换回原任务时,加载此状态继续运行,这个过程就是上下文切换
  • 中断(比如鼠标和键盘操作)会导致上下文切换。

了解了上面的知识,我们来看一个例子:

func main() {
	start := time.Now().UnixMilli()
	task1()
	task2()
	end := time.Now().UnixMilli() - start
	fmt.Printf("总共用时:%d \n", end)
}

func task2() {
	time.Sleep(time.Second * 5)
	fmt.Println("task2执行")
}
func task1() {
	time.Sleep(time.Second * 3)
	fmt.Println("task1执行")
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

上面的例子,用时8s,是因为CPU在执行的时候先执行task1,后执行task2,这种我们称之为串行

func main() {
	start := time.Now().UnixMilli()
	var w sync.WaitGroup
	w.Add(2)
	go task1(&w)
	go task2(&w)
	w.Wait()
	end := time.Now().UnixMilli() - start
	fmt.Printf("总共用时:%d \n", end)
}

func task2(w *sync.WaitGroup) {
	time.Sleep(time.Second * 5)
	fmt.Println("task2执行")
	w.Done()
}

func task1(w *sync.WaitGroup) {
	time.Sleep(time.Second * 3)
	fmt.Println("task1执行")
	w.Done()
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

上述的例子,我们只用了5s,是因为CPU在两个任务之间不停地切换执行(分配时间片),这种我们称之为并发,注意:CPU的速度非常快,我们感觉就像两个任务在同时运行一样

img

上面多了一个概念叫并行,这是因为现代计算机都是多核,也就是多个CPU,在多个CPU之间,是可以做到真正的同时运行的,而不是靠分配时间片的方式近似同时运行,这种在一个时间片内可以同时处理多个任务的方式就叫做并行

在编程中,并行和并发,我们一般都统称为并发

2. 进程,线程,协程

2.1 进程

进程是应用程序运行时的抽象。一个进程包含两部分:

  • 静态部分:程序运行需要的代码和数据
  • 动态部分:程序运行期间的状态(程序计数器、堆、栈…)

image-20240315095512280

比如:启动一个微信,启动谷歌浏览器,就是启动了一个进程。

当我们执行go run main.go时,这个应用程序就会被加载到内存中,会独占一片内存。

此时运行的程序就是进程,此时进程的内存布局为:

img

  • 内核空间具有最高权限,可以直接访问所有资源
  • 用户空间只能访问受限资源,不能直接访问内存等硬件设备,要想访问这些特权资源,必须通过系统调用

创建一个进程时,会将上述内存布局COPY一份,开销很大。

对一个进程来说,一般运行在用户态,进程上下文切换时,会在用户态和内核态之间转换,开销较大

img

2.2 线程

线程是更加轻量级的运行时抽象

  • 线程是进程中的一个执行单元。
  • 一个进程至少有一个线程。也就是说一个进程拥有多个线程。
  • 线程是调度的基本单位。CPU(内核)任务调度以线程为单位。
  • 每个线程都拥有自己的栈,内核也有为线程准备的内核栈。
  • 线程也会进行上下文切换。
  • 两个线程属于不同进程时,上下文切换等同于进程上下文切换(资源不共享)。
  • 两个线程属于同一进程时,上下文切换开销更小(因为有共享资源)

线程只包含运行时的状态:

  • 静态部分由进程提供
  • 包括了执行所需的最小状态(主要是寄存器和栈)

内存布局为:

img

根据线程是否受内核直接管理,可以把线程分为两类:用户级线程和内核级线程。

  • 在用户级线程中,线程的创建、管理等所有工作都由应用程序基于线程库完成,内核意识不到线程的存在。
  • 在内核级线程中,线程管理的所有工作都由内核完成。

img

2.3 协程

协程就是用户态下的轻量级线程,英文叫Coroutine,Go语言的协程叫做Goroutine(是由Go 和 Coroutine拼接出来的词)。

  • 协程的调度由应用程序控制
  • 协程的上下文切换由于不需要内核参与,所以开销很小

我们通过线程模型来加深理解一下

img

上图属于内核线程模型,内核线程和用户线程是一对一关系,线程的创建、销毁、切换工作都是有内核完成的。应用程序不参与线程的管理工作,只能调用内核级线程编程接口。每个用户线程都会被绑定到一个内核线程。线程的创建与删除,调度等都需要系统内核参与,成本大。

img

上图属于用户级线程模型,内核线程和用户线程是一对多关系,线程的创建、销毁以及线程之间的协调、同步等工作都是在用户态完成,具体来说就是由应用程序的线程库来完成。从宏观上来看,任意时刻每个进程只能够有一个线程在运行,且只有一个处理器内核会被分配给该进程。

一个线程发生阻塞可能会影响整个进程,从而阻塞所有线程。

img

上图是两级线程模型,用户态线程与内核态线程是多对多关系(N : M)。

其线程创建在用户空间中完成,线程的调度和同步也在应用程序中进行。一个应用程序中的多个用户级线程被绑定到一些(小于或等于用户级线程的数目)内核级线程上。

3. golang的线程模型

Golang在底层实现了混合型线程模型。

img

M代表着系统线程,一个M关联一个KSE,即两级线程模型中的系统线程。G为Groutine,即两级线程模型的的应用级线程。M与G的关系是N:M。

4. Goroutine

在go语言中,我们只需要使用go关键字就可以很轻松的开启一个协程,Go程序会智能地将 goroutine 中的任务合理地分配给每个CPU,从而实现并发执行,轻松实现高并发程序。

例子:

func hello(i int) {
	fmt.Println("Hello Goroutine!" , i)
}

func main()  {
	for i := 0; i < 10; i++ {
		go hello(i)
	}
	fmt.Println("main goroutine done!")
	time.Sleep(time.Second * 2)
}
1
2
3
4
5
6
7
8
9
10
11

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为10个goroutine是并发执行的,而goroutine的调度是随机的。

如果我们不加休眠代码会怎么样呢?

func hello() {
    fmt.Println("Hello Goroutine!")
}
func main() {
    go hello() // 启动一个goroutine去执行hello函数
    fmt.Println("main goroutine done!")
}
1
2
3
4
5
6
7

这一次的执行结果只打印了main goroutine done!,并没有打印Hello Goroutine!。

  • 在程序启动时,Go程序就会为main()函数创建一个默认的goroutine

  • 当main()函数返回的时候该goroutine就结束了,所有在main()函数中启动的goroutine会一同结束

  • 所以我们要想办法让main函数等一等hello函数

5. channel

go协程之间如何通信呢?答案就是Channel

虽然我们可以通过共享内存(比如全局变量)的方式进行通信,但共享内存容易引发竞态问题,就需要加锁处理,这样会造成性能问题。

Go语言的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存不是通过共享内存而实现通信

如果说goroutine是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel是一种类型,一种引用类型。声明通道类型的格式如下:

var 变量名称 chan 类型

比如:

func main() {
    // 声明一个传递整型的通道
	var ch1 chan int
    //channel的零值为nil
	fmt.Println(ch1)//nil
} 
1
2
3
4
5
6

声明通道后需要使用make函数初始化之后才能使用。

创建channel的格式如下:

 make(chan 元素类型, [缓冲大小])//缓冲大小可选
1

通道有发送(send)接收(receive)关闭(close)三种操作。

发送和接收都使用<-符号。

func main() {
	ch := make(chan int, 1)
	//发送 将10写入通道
	ch <- 10
	//接收 从ch中取值
	x := <-ch
	fmt.Println(x)
	//如果close channel则不能在做发送操作(写操作)
	close(ch)
	ch <- 10
}
1
2
3
4
5
6
7
8
9
10
11

关于关闭通道需要注意的事情是,只有在通知接收方goroutine所有的数据都发送完毕的时候才需要关闭通道。通道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致panic。

5.1 无缓冲和有缓冲

	//无缓冲 相当于缓冲大小为0
	ch := make(chan int)
	//有缓冲
	ch := make(chan int, 5)
1
2
3
4

在前面我们讲过,channel相当于一个队列

Buffered Channel vs Unbuffered Channel In Go

缓冲大小,就是这个队列的大小,如果队列大小为0,也就是无缓冲的时候,会发生什么现象呢?

func main() {
	ch := make(chan int, 0)
	ch <- 10
	fmt.Println("可以执行到这里吗?")
}
1
2
3
4
5

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!
1

为什么会出现deadlock错误呢?

因为我们使用ch := make(chan int)创建的是无缓冲的通道,由于缓冲大小为0,所以必须有接收者才行,你可以理解为到付快递,必须有人签收才行

上方代码,由于没有接收者,所以代码会阻塞在ch <- 10这一行代码形成死锁,解决办法就是,有一个接收者即可:

func recv(c chan int) {
    ret := <-c
    fmt.Println("接收成功", ret)
}
func main() {
    ch := make(chan int)
    go recv(ch) // 启用goroutine从通道接收值
    ch <- 10
    fmt.Println("发送成功")
}
1
2
3
4
5
6
7
8
9
10

无缓冲通道上的发送操作会阻塞,直到另一个goroutine在该通道上执行接收操作,这时值才能发送成功,两个goroutine将继续执行。相反,如果接收操作先执行,接收方的goroutine将阻塞,直到另一个goroutine在该通道上发送一个值。

理解了无缓冲,有缓冲就相当于,送快递,先把快递放到快递柜,接收者可以随时去快递柜取

func main() {
    ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道
    ch <- 10
    fmt.Println("发送成功")
}
1
2
3
4
5

这时候,由于设置了缓冲大小,所以上述代码并不会报错,因为10存入了快递柜

5.2 从通道取值

  • 第一种

    func main() {
    	ch1 := make(chan int)
    	// 开启goroutine将0~100的数发送到ch1中
    	go func() {
    		for i := 0; i < 100; i++ {
    			ch1 <- i
    		}
    		close(ch1)
    	}()
    	for {
    		i, ok := <-ch1 // 通道关闭后再取值ok=false
    		if !ok {
    			break
    		}
    		fmt.Println(i)
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
  • 第二种

    func main() {
    	ch1 := make(chan int)
    	// 开启goroutine将0~100的数发送到ch1中
    	go func() {
    		for i := 0; i < 100; i++ {
    			ch1 <- i
    		}
    		close(ch1)
    	}()
    	for i := range ch1 { // 通道关闭后会退出for range循环
    		fmt.Println(i)
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
  • 第三种

    func main() {
    	ch1 := make(chan int)
    	// 开启goroutine将0~100的数发送到ch1中
    	go func() {
    		for i := 0; i < 100; i++ {
    			ch1 <- i
    		}
    		close(ch1)
    	}()
    	for {
    		select {
    		case i, ok := <-ch1:
    			if ok {
    				fmt.Println(i)
    			}
    		default:
    			fmt.Println("default")
    		}
    	}
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20

6. select

Go 语言中的 select 语句是一种用于多路复用通道的机制,它允许在多个通道上等待并处理消息。

使用 select 语句能够更加高效地管理多个通道。

使用方式:

select {
    case <- channel1:
        // channel1准备好了
    case data := <- channel2:
        // channel2准备好了,并且可以读取到数据data
    case channel3 <- data:
        // channel3准备好了,并且可以往其中写入数据data
    default:
        // 没有任何channel准备好了
}

1
2
3
4
5
6
7
8
9
10
11

案例:

func main() {
   ch := make(chan int)
   go func() {
      time.Sleep(3 * time.Second)
      ch <- 1
   }()

   select {
   case data, ok := <-ch:
      if ok {
         fmt.Println("接收到数据: ", data)
      } else {
         fmt.Println("通道已被关闭")
      }
   case <-time.After(2 * time.Second):
      fmt.Println("超时了!")
   }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

7. 锁

有时候在Go代码中可能会存在多个goroutine同时操作一个资源(临界区),这种情况会发生竞态问题(数据竞态)。类比现实生活中的例子有十字路口被各个方向的的汽车竞争;还有火车上的卫生间被车厢里的人竞争。

举个例子:

var x int64
var wg sync.WaitGroup

func add() {
    for i := 0; i < 5000; i++ {
        x = x + 1
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

上面的代码中我们开启了两个goroutine去累加变量x的值,这两个goroutine在访问和修改x变量的时候就会存在数据竞争,导致最后的结果与期待的不符

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同时只有一个goroutine可以访问共享资源。Go语言中使用sync包的Mutex类型来实现互斥锁。

var x int64
var wg sync.WaitGroup
var lock sync.Mutex

func add() {
    for i := 0; i < 5000; i++ {
        lock.Lock() // 加锁
        x = x + 1
        lock.Unlock() // 解锁
    }
    wg.Done()
}
func main() {
    wg.Add(2)
    go add()
    go add()
    wg.Wait()
    fmt.Println(x)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

使用互斥锁能够保证同一时间有且只有一个goroutine进入临界区,其他的goroutine则在等待锁;当互斥锁释放后,等待的goroutine才可以获取锁进入临界区,多个goroutine同时等待一个锁时,唤醒的策略是随机的。

互斥锁是完全互斥的,但是有很多实际的场景下是读多写少的,当我们并发的去读取一个资源不涉及资源修改的时候是没有必要加锁的,这种场景下使用读写锁是更好的一种选择。读写锁在Go语言中使用sync包中的RWMutex类型。

读写锁分为两种:读锁和写锁。当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁会继续获得锁,如果是获取写锁就会等待;当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是写锁都会等待。

var (
    x      int64
    wg     sync.WaitGroup
    lock   sync.Mutex
    rwlock sync.RWMutex
)

func write() {
    // lock.Lock()   // 加互斥锁
    rwlock.Lock() // 加写锁
    x = x + 1
    time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
    rwlock.Unlock()                   // 解写锁
    // lock.Unlock()                     // 解互斥锁
    wg.Done()
}

func read() {
    // lock.Lock()                  // 加互斥锁
    rwlock.RLock()               // 加读锁
    time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
    rwlock.RUnlock()             // 解读锁
    // lock.Unlock()                // 解互斥锁
    wg.Done()
}

func main() {
    start := time.Now()
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go write()
    }

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go read()
    }

    wg.Wait()
    end := time.Now()
    fmt.Println(end.Sub(start))
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

如果使用互斥锁,我们会发现耗时很长,当读多写少的场景,使用读写锁可以提供性能