新版本

前面的课程中,我们使用的proto的go生成器,使用的命令是

go get github.com/golang/protobuf/protoc-gen-go,在https://www.grpc.io/docs/languages/go/quickstart/ 中,我们发现

image-20220513194110535

官方使用的和我们使用的并不一致。

github的方式,需要使用--go_out=plugins=grpc 来去进行生成,而在golang.org方式中,弃用了这种方式,使用protoc-gen-go将不在支持gRPC service的定义,需要使用新的插件protoc-gen-go-grpc

所以,这里,我们以新的方式,重新实现一下服务端和客户端

1. 使用google.golang.org/protobuf

  1. 安装插件

    $ go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
    $ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
    
    1
    2

    安装完成后会在gopath下的bin目录下生成

    image-20220513195044682

  2. 利用proto文件重新生成go文件

    protoc  --go_out=./service --go-grpc_out=./service  pbfile\product.proto
    
    1
  3. 修改之前的service实现

    package service
    
    import (
    	"context"
    )
    
    var ProductService = &productService{}
    
    type productService struct {
    }
    
    func (p *productService) GetProductStock(ctx context.Context, request *ProductRequest) (*ProductResponse, error) {
    	return &ProductResponse{ProdStock: request.ProdId}, nil
    }
    func (p *productService) mustEmbedUnimplementedProdServiceServer() {}
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
  4. 测试

2. import使用

用于导入其他proto文件

// 这个就是protobuf的中间文件

// 指定的当前proto语法的版本,有2和3
syntax = "proto3";
//从执行 protoc这个命令的当前目录开始算起,
import "pbfile/user.proto";

option go_package="../service";

// 指定等会文件生成出来的package
package service;

// 定义request model
message ProductRequest{
  int32 prod_id = 1; // 1代表顺序
}

// 定义response model
message ProductResponse{
  int32 prod_stock = 1; // 1代表顺序
  User user = 2;
}

// 定义服务主体
service ProdService{
  // 定义方法
  rpc GetProductStock(ProductRequest) returns(ProductResponse);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

2.1 Any

任意类型

// 使用any类型,需要导入这个
import "google/protobuf/any.proto";

// 定义入参消息
message HelloParam{
  // any,代表可以是任何类型
  google.protobuf.Any data = 1;
}

1
2
3
4
5
6
7
8
9
// 这个就是protobuf的中间文件

// 指定的当前proto语法的版本,有2和3
syntax = "proto3";
//从执行 protoc这个命令的当前目录开始算起,
import "user.proto";
// 使用any类型,需要导入这个
import "google/protobuf/any.proto";

option go_package="../service";

// 指定等会文件生成出来的package
package service;

// 定义request model
message ProductRequest{
  int32 prod_id = 1; // 1代表顺序
}

message Content {
  string msg = 1;
}
// 定义response model
message ProductResponse{
  int32 prod_stock = 1; // 1代表顺序
  User user = 2;
  google.protobuf.Any data = 3;
}

// 定义服务主体
service ProdService{
  // 定义方法
  rpc GetProductStock(ProductRequest) returns(ProductResponse);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (p *productService) GetProductStock(context context.Context, request *ProductRequest) (*ProductResponse, error) {
	//实现具体的业务逻辑
	stock := p.GetStockById(request.ProdId)
	user := User{Username: "mszlu"}
	content := Content{Msg: "mszlu msg..."}
	//转换成any类型
	any, _ := anypb.New(&content)
	return &ProductResponse{ProdStock: stock, User: &user, Data: any}, nil
}

1
2
3
4
5
6
7
8
9
10

3. stream

在 HTTP/1.1 的时代,同一个时刻只能对一个请求进行处理或者响应,换句话说,下一个请求必须要等当前请求处理完才能继续进行。

HTTP/1.1需要注意的是,在服务端没有response的时候,客户端是可以发起多个request的,但服务端依旧是顺序对请求进行处理, 并按照收到请求的次序予以返回。

HTTP/2 的时代,多路复用的特性让一次同时处理多个请求成为了现实,并且同一个 TCP 通道中的请求不分先后、不会阻塞,HTTP/2 中引入了流(Stream) 和 帧(Frame) 的概念,当 TCP 通道建立以后,后续的所有操作都是以流的方式发送的,而二进制帧则是组成流的最小单位,属于协议层上的流式传输。

HTTP/2 在一个 TCP 连接的基础上虚拟出多个 Stream, Stream 之间可以并发的请求和处理, 并且 HTTP/2 以二进制帧 (frame) 的方式进行数据传送, 并引入了头部压缩 (HPACK), 大大提升了交互效率

3.1 定义

 1  // 普通 RPC
 2  rpc SimplePing(PingRequest) returns (PingReply);
 3
 4  // 客户端流式 RPC
 5  rpc ClientStreamPing(stream PingRequest) returns (PingReply);
 6
 7  // 服务器端流式 RPC
 8  rpc ServerStreamPing(PingRequest) returns (stream PingReply);
 9
10  // 双向流式 RPC
11  rpc BothStreamPing(stream PingRequest) returns (stream PingReply);

1
2
3
4
5
6
7
8
9
10
11
12

stream关键字,当该关键字修饰参数时,表示这是一个客户端流式的 gRPC 接口;当该参数修饰返回值时,表示这是一个服务器端流式的 gRPC 接口;当该关键字同时修饰参数和返回值时,表示这是一个双向流式的 gRPC 接口。

3.2 客户端流

定义:

rpc UpdateStockClientStream(stream ProductRequest) returns(ProductResponse);
1
//....	
stream, err := prodClient.UpdateProductStockClientStream(context.Background())
	if err != nil {
		log.Fatal("获取流出错", err)
	}
	rsp := make(chan struct{}, 1)
	go prodRequest(stream, rsp)
	select {
	case <-rsp:
		recv, err := stream.CloseAndRecv()
		if err != nil {
			log.Fatal(err)
		}
		stock := recv.ProdStock
		fmt.Println("客户端收到响应:", stock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16

func prodRequest(stream service.ProdService_UpdateProductStockClientStreamClient, rsp chan struct{}) {
	count := 0
	for {
		request := &service.ProductRequest{
			ProdId: 123,
		}
		err := stream.Send(request)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second)
		count++
		if count > 10 {
			rsp <- struct{}{}
			break
		}
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

func (p *productService) UpdateProductStockClientStream(stream ProdService_UpdateProductStockClientStreamServer) error {
	count := 0
	for {
		//源源不断的去接收客户端发来的信息
		recv, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				return nil
			}
			return err
		}
		fmt.Println("服务端接收到的流", recv.ProdId, count)
		count++
		if count > 10 {
			rsp := &ProductResponse{ProdStock: recv.ProdId}
			err := stream.SendAndClose(rsp)
			if err != nil {
				return err
			}
			return nil
		}
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

3.3 服务端流

定义:

rpc GetProductStockServerStream(ProductRequest) returns(stream ProductResponse);
1

func (p *productService) GetProductStockServerStream(request *ProductRequest, stream ProdService_GetProductStockServerStreamServer) error {
	count := 0
	for {
		rsp := &ProductResponse{ProdStock: request.ProdId}
		err := stream.Send(rsp)
		if err != nil {
			return err
		}
		time.Sleep(time.Second)
		count++
		if count > 10 {
			return nil
		}
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
request := &service.ProductRequest{
		ProdId: 123,
	}

	stream, err := prodClient.GetProductStockServerStream(context.Background(), request)
	if err != nil {
		log.Fatal("获取流出错", err)
	}

	for {
		recv, err := stream.Recv()
		if err != nil {
			if err == io.EOF {
				fmt.Println("客户端数据接收完成")
				err := stream.CloseSend()
				if err != nil {
					log.Fatal(err)
				}
				break
			}
			log.Fatal(err)
		}
		fmt.Println("客户端收到的流", recv.ProdStock)
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

3.4 双向流

stream, err := prodClient.SayHelloStream(context.Background())

	for {
		request := &service.ProductRequest{
			ProdId: 123,
		}
		err = stream.Send(request)
		if err != nil {
			log.Fatal(err)
		}
		time.Sleep(time.Second)
		recv, err := stream.Recv()
		if err != nil {
			log.Fatal(err)
		}
		//websocket
		fmt.Println("客户端收到的流信息", recv.ProdStock)
	}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

func (p *productService) SayHelloStream(stream ProdService_SayHelloStreamServer) error {
	for {
		recv, err := stream.Recv()
		if err != nil {
			return nil
		}
		fmt.Println("服务端收到客户端的消息", recv.ProdId)
		time.Sleep(time.Second)
		rsp := &ProductResponse{ProdStock: recv.ProdId}
		err = stream.Send(rsp)
		if err != nil {
			return nil
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16