中间件

go-zero框架中内置了很多中间件,我们分别介绍一下这些中间件的作用和如何使用

  • 鉴权管理中间件 AuthorizeHandler
  • 熔断中间件 BreakerHandler
  • 内容安全中间件 ContentSecurityHandler
  • 解密中间件 CryptionHandler
  • 压缩管理中间件 GunzipHandler
  • 日志中间件 LogHandler
  • ContentLength 管理中间件 MaxBytesHandler
  • 限流中间件 MaxConnsHandler
  • 指标统计中间件 MetricHandler
  • 普罗米修斯指标中间件 PrometheusHandler
  • panic 恢复中间件 RecoverHandler
  • 负载监控中间件 SheddingHandler
  • 超时中间件 TimeoutHandler
  • 链路追踪中间件 TraceHandler

1. 鉴权管理中间件

这部分就是之前使用的jwt认证,当携带token时,会解析token,判断token是否有效

func Authorize(secret string, opts ...AuthorizeOption) func(http.Handler) http.Handler {
	var authOpts AuthorizeOptions
	for _, opt := range opts {
		opt(&authOpts)
	}

	parser := token.NewTokenParser()
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            //解析token
			tok, err := parser.ParseToken(r, secret, authOpts.PrevSecret)
			if err != nil {
				unauthorized(w, r, err, authOpts.Callback)
				return
			}

			if !tok.Valid {
				unauthorized(w, r, errInvalidToken, authOpts.Callback)
				return
			}
			//获取jwt中的payload
			claims, ok := tok.Claims.(jwt.MapClaims)
			if !ok {
				unauthorized(w, r, errNoClaims, authOpts.Callback)
				return
			}

			ctx := r.Context()
			for k, v := range claims {
				switch k {
				case jwtAudience, jwtExpire, jwtId, jwtIssueAt, jwtIssuer, jwtNotBefore, jwtSubject:
					// ignore the standard claims
				default:
                    //将数据放入上下文
					ctx = context.WithValue(ctx, k, v)
				}
			}

			next.ServeHTTP(w, r.WithContext(ctx))
		})
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

2. 熔断中间件

断路器又叫熔断器,是一种保护机制,用于保护服务调用链路中的服务不被过多的请求压垮。当服务调用链路中的某个服务出现异常时,断路器会将该服务的调用请求拒绝,从而保护服务调用链路中的其他服务不被压垮。

img

img

  • go-zero 中采用滑动窗口来进行数据采集,目前是以 10s 为一个窗口,单个窗口有40个桶,然后将窗口内采集的数据采用的是 google sre 算法计算是否开启熔断
  • HTTP 以请求方法+路由作为统计维度,用 HTTP 状态码 500 作为错误采集指标进行统计
  • gRPC 客户端以 rpc 方法名作为统计维度,用 grpc 的错误码为 codes.DeadlineExceeded, codes.Internal, codes.Unavailable, codes.DataLoss, codes.Unimplemented 作为错误采集指标进行统计
  • gRPC 服务端以 rpc 方法名称作为统计维度,用 grpc 的错误作为错误采集指标进行统计
func BreakerHandler(method, path string, metrics *stat.Metrics) func(http.Handler) http.Handler {
    //path和method是route的 get /user/info
	brk := breaker.NewBreaker(breaker.WithName(strings.Join([]string{method, path}, breakerSeparator)))
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            //判断请求是否允许通过
			promise, err := brk.Allow()
			if err != nil {
				metrics.AddDrop()
				logx.Errorf("[http] dropped, %s - %s - %s",
					r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
				w.WriteHeader(http.StatusServiceUnavailable)
				return
			}

			cw := response.NewWithCodeResponseWriter(w)
			defer func() {
				if cw.Code < http.StatusInternalServerError {
					promise.Accept()
				} else {
					promise.Reject(fmt.Sprintf("%d %s", cw.Code, http.StatusText(cw.Code)))
				}
			}()
			next.ServeHTTP(cw, r)
		})
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

修改代码进行断路器测试:

func (l *GetUserInfoLogic) GetUserInfo() (resp *types.UserInfoResp, err error) {
	userId, err := l.ctx.Value("userId").(json.Number).Int64()
	if err != nil {
		return nil, biz.TokenError
	}
	if userId <= 5 {
		return nil, errors.New("熔断错误测试")
	}
    ...
}
1
2
3
4
5
6
7
8
9
10

请求接口,返回500错误,请求一定次数后,返回503(服务不可用),断路器生效

关闭:

Middlewares:
 # 控制断路器是否开启
  Breaker: false
1
2
3

3. 内容安全中间件

// LimitContentSecurityHandler returns a middleware to verify content security.
func LimitContentSecurityHandler(limitBytes int64, decrypters map[string]codec.RsaDecrypter,
	tolerance time.Duration, strict bool, callbacks ...UnsignedCallback) func(http.Handler) http.Handler {
	if len(callbacks) == 0 {
		callbacks = append(callbacks, handleVerificationFailure)
	}

	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			switch r.Method {
			case http.MethodDelete, http.MethodGet, http.MethodPost, http.MethodPut:
				header, err := security.ParseContentSecurity(decrypters, r)
				if err != nil {
					logx.Errorf("Signature parse failed, X-Content-Security: %s, error: %s",
						r.Header.Get(contentSecurity), err.Error())
					executeCallbacks(w, r, next, strict, httpx.CodeSignatureInvalidHeader, callbacks)
				} else if code := security.VerifySignature(r, header, tolerance); code != httpx.CodeSignaturePass {
					logx.Errorf("Signature verification failed, X-Content-Security: %s",
						r.Header.Get(contentSecurity))
					executeCallbacks(w, r, next, strict, code, callbacks)
				} else if r.ContentLength > 0 && header.Encrypted() {
                    //解密中间件
					LimitCryptionHandler(limitBytes, header.Key)(next).ServeHTTP(w, r)
				} else {
					next.ServeHTTP(w, r)
				}
			default:
				next.ServeHTTP(w, r)
			}
		})
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

采用RSA加密算法,客户端使用公钥对消息进行加密,服务端使用私钥对消息进行解密

配置项:

// A SignatureConf is a signature config.
	SignatureConf struct {
		Strict      bool          `json:",default=false"`
		Expiry      time.Duration `json:",default=1h"`
		PrivateKeys []PrivateKeyConf
	}
	// A PrivateKeyConf is a private key config.
	PrivateKeyConf struct {
		Fingerprint string
		KeyFile     string
	}
1
2
3
4
5
6
7
8
9
10
11

开启签名:

@server (
	//代表当前service的代码会放在account目录下
	//这里注意 冒汗要紧贴着key
	group: account
	//路由前缀
	prefix: v1
	//开启jwt认证
	jwt: Auth
	//开启签名认证
	signature: true
)
1
2
3
4
5
6
7
8
9
10
11
server.AddRoutes(
		[]rest.Route{
			{
				Method:  http.MethodGet,
				Path:    "/user/info",
				Handler: account.GetUserInfoHandler(serverCtx),
			},
		},
		rest.WithJwt(serverCtx.Config.Auth.AccessSecret),
		rest.WithSignature(serverCtx.Config.Signature),
		rest.WithPrefix("/v1"),
	)
1
2
3
4
5
6
7
8
9
10
11
12

设置签名未通过的处理:

	server := rest.MustNewServer(
		c.RestConf,
		rest.WithCors("*"),
        rest.WithCorsHeaders("X-Content-Security"),
		rest.WithUnsignedCallback(func(w http.ResponseWriter, r *http.Request, next http.Handler, strict bool, code int) {
			fmt.Println("-------------签名未通过")
		}),
	)
1
2
3
4
5
6
7
8

配置:

Signature:
  PrivateKeys:
    - Fingerprint: "kQz7qW/LWL+10KatnBaX3A=="
      KeyFile: "etc/pri.key"
1
2
3
4
-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQCyeDYV2ieOtNDi6tuNtAbmUjN9pTHluAU5yiKEz8826QohcxqU
KP3hybZBcm60p+rUxMAJFBJ8Dt+UJ6sEMzrf1rOFYOImVvORkXjpFU7sCJkhnLMs
/kxtRzcZJG6ADUlG4GDCNcZpY/qELEvwgm2kCcHitGC2mO8opFFFHTR0aQIDAQAB
AoGAcENv+jT9VyZkk6karLuG75DbtPiaN5+XIfAF4Ld76FWVOs9V88cJVON20xpx
ixBphqexCMToj8MnXuHJEN5M9H15XXx/9IuiMm3FOw0i6o0+4V8XwHr47siT6T+r
HuZEyXER/2qrm0nxyC17TXtd/+TtpfQWSbivl6xcAEo9RRECQQDj6OR6AbMQAIDn
v+AhP/y7duDZimWJIuMwhigA1T2qDbtOoAEcjv3DB1dAswJ7clcnkxI9a6/0RDF9
0IEHUcX9AkEAyHdcegWiayEnbatxWcNWm1/5jFnCN+GTRRFrOhBCyFr2ZdjFV4T+
acGtG6omXWaZJy1GZz6pybOGy93NwLB93QJARKMJ0/iZDbOpHqI5hKn5mhd2Je25
IHDCTQXKHF4cAQ+7njUvwIMLx2V5kIGYuMa5mrB/KMI6rmyvHv3hLewhnQJBAMMb
cPUOENMllINnzk2oEd3tXiscnSvYL4aUeoErnGP2LERZ40/YD+mMZ9g6FVboaX04
0oHf+k5mnXZD7WJyJD0CQQDJ2HyFbNaUUHK+lcifCibfzKTgmnNh9ZpePFumgJzI
EfFE5H+nzsbbry2XgJbWzRNvuFTOLWn4zM+aFyy9WvbO
-----END RSA PRIVATE KEY-----
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

前端添加头信息:

'X-Content-Security':"key=kQz7qW/LWL+10KatnBaX3A==; secret=Ar5FnHunK82kPeM1eQHpaBHjsjas4ruT8cB/Ht+St44nX7fsLU968vkqvaIJ/+uH4gAeaSwf9+NeTxFcFecl+rBrp5uLnLKMsPzEsXaoBPd33nFxwWcpQ4RR4Z4y2KxcIRZ2Fkj2iEqKFdpLgPQ/giSmh9uZddOjDJpdgs76Yw0=; signature=oyJlQyfBDkRWtJOoG8fKiCXaDvAswRGTtGzFf6Avjy4=",
1

访问测试。

生成X-Content-Security的代码:

package biz

import (
	"bytes"
	"crypto/hmac"
	"crypto/md5"
	"crypto/sha256"
	"encoding/base64"
	"fmt"
	"github.com/zeromicro/go-zero/core/iox"
	"io"
	"log"
	"net/http"
	"os"
	"strconv"
	"strings"
	"testing"
	"time"

	"github.com/stretchr/testify/assert"
	"github.com/zeromicro/go-zero/core/codec"
	"github.com/zeromicro/go-zero/core/fs"
)

const (
	pubKey = `-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCyeDYV2ieOtNDi6tuNtAbmUjN9
pTHluAU5yiKEz8826QohcxqUKP3hybZBcm60p+rUxMAJFBJ8Dt+UJ6sEMzrf1rOF
YOImVvORkXjpFU7sCJkhnLMs/kxtRzcZJG6ADUlG4GDCNcZpY/qELEvwgm2kCcHi
tGC2mO8opFFFHTR0aQIDAQAB
-----END PUBLIC KEY-----`
	priKey = `-----BEGIN RSA PRIVATE KEY-----
MIICXQIBAAKBgQCyeDYV2ieOtNDi6tuNtAbmUjN9pTHluAU5yiKEz8826QohcxqU
KP3hybZBcm60p+rUxMAJFBJ8Dt+UJ6sEMzrf1rOFYOImVvORkXjpFU7sCJkhnLMs
/kxtRzcZJG6ADUlG4GDCNcZpY/qELEvwgm2kCcHitGC2mO8opFFFHTR0aQIDAQAB
AoGAcENv+jT9VyZkk6karLuG75DbtPiaN5+XIfAF4Ld76FWVOs9V88cJVON20xpx
ixBphqexCMToj8MnXuHJEN5M9H15XXx/9IuiMm3FOw0i6o0+4V8XwHr47siT6T+r
HuZEyXER/2qrm0nxyC17TXtd/+TtpfQWSbivl6xcAEo9RRECQQDj6OR6AbMQAIDn
v+AhP/y7duDZimWJIuMwhigA1T2qDbtOoAEcjv3DB1dAswJ7clcnkxI9a6/0RDF9
0IEHUcX9AkEAyHdcegWiayEnbatxWcNWm1/5jFnCN+GTRRFrOhBCyFr2ZdjFV4T+
acGtG6omXWaZJy1GZz6pybOGy93NwLB93QJARKMJ0/iZDbOpHqI5hKn5mhd2Je25
IHDCTQXKHF4cAQ+7njUvwIMLx2V5kIGYuMa5mrB/KMI6rmyvHv3hLewhnQJBAMMb
cPUOENMllINnzk2oEd3tXiscnSvYL4aUeoErnGP2LERZ40/YD+mMZ9g6FVboaX04
0oHf+k5mnXZD7WJyJD0CQQDJ2HyFbNaUUHK+lcifCibfzKTgmnNh9ZpePFumgJzI
EfFE5H+nzsbbry2XgJbWzRNvuFTOLWn4zM+aFyy9WvbO
-----END RSA PRIVATE KEY-----`
	body = ""
)

var key = []byte("q4t7w!z%C*F-JaNdRgUjXn2r5u8x/A?D")

func TestContentSecurity(t *testing.T) {
	tests := []struct {
		name        string
		mode        string
		extraKey    string
		extraSecret string
		extraTime   string
		err         error
		code        int
	}{
		{
			name:      "encrypted",
			mode:      "1",
			extraTime: "3600",
		},
	}

	for _, test := range tests {
		test := test
		t.Run(test.name, func(t *testing.T) {
			t.Parallel()
			limit := int64(1024)
			src := []byte("")
			rb := bytes.NewBuffer(src)
			wb := new(bytes.Buffer)
			r, err := http.NewRequest(http.MethodGet, "http://localhost:8888/v1/user/info",
				io.TeeReader(iox.LimitTeeReader(http.NoBody, wb, limit), rb))
			r.Header.Set("Content-Type", "application/json")
			assert.Nil(t, err)

			timestamp := time.Now().Unix()
			bodySign := computeBodySignature(r)
			fmt.Println(bodySign)
			contentOfSign := strings.Join([]string{
				strconv.FormatInt(timestamp+3600, 10),
				http.MethodGet,
				r.URL.Path,
				r.URL.RawQuery,
				bodySign,
			}, "\n")
			sign := hs256(key, contentOfSign)
			content := strings.Join([]string{
				"version=v1",
				"type=" + test.mode,
				fmt.Sprintf("key=%s", base64.StdEncoding.EncodeToString(key)) + test.extraKey,
				"time=" + strconv.FormatInt(timestamp+3600, 10),
			}, "; ")

			encrypter, err := codec.NewRsaEncrypter([]byte(pubKey))
			if err != nil {
				log.Fatal(err)
			}

			output, err := encrypter.Encrypt([]byte(content))
			if err != nil {
				log.Fatal(err)
			}

			encryptedContent := base64.StdEncoding.EncodeToString(output)
			join := strings.Join([]string{
				fmt.Sprintf("key=%s", fingerprint(pubKey)),
				"secret=" + encryptedContent + test.extraSecret,
				"signature=" + sign,
			}, "; ")
			fmt.Println(join)
			r.Header.Set("X-Content-Security", join)

			file, err := fs.TempFilenameWithText(priKey)
			assert.Nil(t, err)
			defer os.Remove(file)

		})
	}
}

func computeBodySignature(r *http.Request) string {
	var dup io.ReadCloser
	r.Body, dup = iox.DupReadCloser(r.Body)
	sha := sha256.New()
	io.Copy(sha, r.Body)
	r.Body = dup
	return fmt.Sprintf("%x", sha.Sum(nil))
}

func fingerprint(key string) string {
	h := md5.New()
	io.WriteString(h, key)
	return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

func hs256(key []byte, body string) string {
	h := hmac.New(sha256.New, key)
	io.WriteString(h, body)
	return base64.StdEncoding.EncodeToString(h.Sum(nil))
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147

4. 解密中间件

// LimitCryptionHandler returns a middleware to handle cryption.
func LimitCryptionHandler(limitBytes int64, key []byte) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			cw := newCryptionResponseWriter(w)
			defer cw.flush(key)

			if r.ContentLength <= 0 {
				next.ServeHTTP(cw, r)
				return
			}

			if err := decryptBody(limitBytes, key, r); err != nil {
				w.WriteHeader(http.StatusBadRequest)
				return
			}

			next.ServeHTTP(cw, r)
		})
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

使用AES加密方式,ECB模式。

内容安全中间件中,如果解密出来的header信息中携带有对内容加密的标识,需要继续对内容进行解密才可以,这时就使用到了解密中间件

5. 压缩管理中间件


// GunzipHandler returns a middleware to gunzip http request body.
func GunzipHandler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		if strings.Contains(r.Header.Get(httpx.ContentEncoding), gzipEncoding) {
			reader, err := gzip.NewReader(r.Body)
			if err != nil {
				w.WriteHeader(http.StatusBadRequest)
				return
			}

			r.Body = reader
		}

		next.ServeHTTP(w, r)
	})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

如果请求Header中Content-Encoding=gzip,则使用gzip的方式进行读取

6. 日志中间件

func LogHandler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		timer := utils.NewElapsedTimer()
		logs := new(internal.LogCollector)
		lrw := response.NewWithCodeResponseWriter(w)

		var dup io.ReadCloser
		r.Body, dup = iox.LimitDupReadCloser(r.Body, limitBodyBytes)
		next.ServeHTTP(lrw, r.WithContext(internal.WithLogCollector(r.Context(), logs)))
		r.Body = dup
		logBrief(r, lrw.Code, timer, logs)
	})
}
1
2
3
4
5
6
7
8
9
10
11
12
13
func logBrief(r *http.Request, code int, timer *utils.ElapsedTimer, logs *internal.LogCollector) {
	var buf bytes.Buffer
	duration := timer.Duration()
	logger := logx.WithContext(r.Context()).WithDuration(duration)
	buf.WriteString(fmt.Sprintf("[HTTP] %s - %s %s - %s - %s",
		wrapStatusCode(code), wrapMethod(r.Method), r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent()))
	if duration > slowThreshold.Load() {
		logger.Slowf("[HTTP] %s - %s %s - %s - %s - slowcall(%s)",
			wrapStatusCode(code), wrapMethod(r.Method), r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent(),
			timex.ReprOfDuration(duration))
	}

	ok := isOkResponse(code)
	if !ok {
		buf.WriteString(fmt.Sprintf("\n%s", dumpRequest(r)))
	}

	body := logs.Flush()
	if len(body) > 0 {
		buf.WriteString(fmt.Sprintf("\n%s", body))
	}

	if ok {
		logger.Info(buf.String())
	} else {
		logger.Error(buf.String())
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

打印请求日志,可以通过配置开关

Middlewares:
# 是否开启日志记录
  Log: false
# 配置为true 代码记录详细的日志 对应中间件为DetailedLogHandler 必须先开启日志才能生效
Verbose: true
1
2
3
4
5

7. ContentLength 管理中间件

// MaxBytesHandler returns a middleware that limit reading of http request body.
func MaxBytesHandler(n int64) func(http.Handler) http.Handler {
    //配置小于等于0 代表不限制
	if n <= 0 {
		return func(next http.Handler) http.Handler {
			return next
		}
	}

	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			if r.ContentLength > n {
				internal.Errorf(r, "request entity too large, limit is %d, but got %d, rejected with code %d",
					n, r.ContentLength, http.StatusRequestEntityTooLarge)
				w.WriteHeader(http.StatusRequestEntityTooLarge)
			} else {
				next.ServeHTTP(w, r)
			}
		})
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

Content-Length表示请求消息中body的长度,此中间件可以通过配置对消息的长度做限制,不符合条件返回413(请求的数据超过服务器的限制)

Middlewares:
  #控制此中间件是否开启
  MaxBytes: false
1
2
3

在创建路由时:

//配置限制的数据长度
rest.WithMaxBytes(1048576)
1
2
@server (
	//代表当前service的代码会放在account目录下
	//这里注意 冒汗要紧贴着key
	group: account
	//路由前缀
	prefix:   v1
	jwt:      Auth
    //配置限制的数据长度
	maxBytes: 1048576
)
1
2
3
4
5
6
7
8
9
10

8. 限流中间件

// MaxConnsHandler returns a middleware that limit the concurrent connections.
func MaxConnsHandler(n int) func(http.Handler) http.Handler {
    //配置小于0 代表不限流
	if n <= 0 {
		return func(next http.Handler) http.Handler {
			return next
		}
	}

	return func(next http.Handler) http.Handler {
		latch := syncx.NewLimit(n)

		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			if latch.TryBorrow() {
				defer func() {
					if err := latch.Return(); err != nil {
						logx.WithContext(r.Context()).Error(err)
					}
				}()

				next.ServeHTTP(w, r)
			} else {
				internal.Errorf(r, "concurrent connections over %d, rejected with code %d",
					n, http.StatusServiceUnavailable)
				w.WriteHeader(http.StatusServiceUnavailable)
			}
		})
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

用于限制 http 最大并发请求数,当并发请求数超过设置的值(默认为 10000),当超过设置值会返回 http.StatusServiceUnavailable 状态码

Middlewares:
 #配置是否开启中间件
  MaxConns: true
# 配置触发限流的并发请求数
MaxConns: 10000
1
2
3
4
5

可以通过并发测试软件对其进行测试验证。

9. 指标统计中间件


// MetricHandler returns a middleware that stat the metrics.
func MetricHandler(metrics *stat.Metrics) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			startTime := timex.Now()
			defer func() {
				metrics.Add(stat.Task{
					Duration: timex.Since(startTime),
				})
			}()

			next.ServeHTTP(w, r)
		})
	}
}
func log(report *StatReport) {
	writeReport(report)
	if logEnabled.True() {
		logx.Statf("(%s) - qps: %.1f/s, drops: %d, avg time: %.1fms, med: %.1fms, "+
			"90th: %.1fms, 99th: %.1fms, 99.9th: %.1fms",
			report.Name, report.ReqsPerSecond, report.Drops, report.Average, report.Median,
			report.Top90th, report.Top99th, report.Top99p9th)
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

开启指标统计后,统计请求相关的一些指标,qps,drops,avg,med,t90,t99等

Middlewares:
  # 开启指标统计
  Metrics: false
1
2
3
stat   (user-api) - qps: 0.1/s, drops: 0, avg time: 1.2ms, med: 1.5ms, 90th: 1.6ms, 99th: 1.6ms, 99.9th: 1.6ms     caller=stat/metrics.go:210
1

stat (user-api) - qps: 0.1/s, drops: 0, avg time: 1.2ms, med: 1.5ms, 90th: 1.6ms, 99th: 1.6ms, 99.9th: 1.6ms caller=stat/metrics.go:210

TP指标: 指在一个时间段内,统计该方法每次调用所消耗的时间,并将这些时间按从小到大的顺序进行排序, 并取出结果为 : 总次数 * 指标数 = 对应TP指标的序号 , 再根据序号取出对应排序好的时间,即为TP指标。

  • 假设上一分钟内接口被调用100次,100次的调用耗时分别为:1、2、3...99、100秒。
  • 我们对耗时进行从小到大排序,形成容量为100的数组A=[1s,2s,3s....99s,100s]
  • TP50的计算方式:100*50%=50,所以TP50指标=A[50]=50s
  • TP99的计算方式:100*99%=99,所以TP99指标=A[99]=99s
  • TP999的计算方式:100*999%=99.9,99.9进位取整为100,所以TP999指标=A[999]=100s

10. panic 恢复中间件

// RecoverHandler returns a middleware that recovers if panic happens.
func RecoverHandler(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		defer func() {
			if result := recover(); result != nil {
				internal.Error(r, fmt.Sprintf("%v\n%s", result, debug.Stack()))
				w.WriteHeader(http.StatusInternalServerError)
			}
		}()

		next.ServeHTTP(w, r)
	})
}
1
2
3
4
5
6
7
8
9
10
11
12
13

用于捕获panic错误

11. 超时中间件

/ TimeoutHandler returns the handler with given timeout.
// If client closed request, code 499 will be logged.
// Notice: even if canceled in server side, 499 will be logged as well.
func TimeoutHandler(duration time.Duration) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		if duration <= 0 {
			return next
		}

		return &timeoutHandler{
			handler: next,
			dt:      duration,
		}
	}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Timeout: 3000 # 默认超时时间为3000毫秒
1
@server (
    timeout:    3s // 对当前 Foo 语法块下的所有路由进行超时配置,不需要则请删除此行
)
1
2
3

12. 负载监控中间件


// SheddingHandler returns a middleware that does load shedding.
func SheddingHandler(shedder load.Shedder, metrics *stat.Metrics) func(http.Handler) http.Handler {
	if shedder == nil {
		return func(next http.Handler) http.Handler {
			return next
		}
	}

	ensureSheddingStat()

	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			sheddingStat.IncrementTotal()
			promise, err := shedder.Allow()
			if err != nil {
				metrics.AddDrop()
				sheddingStat.IncrementDrop()
				logx.Errorf("[http] dropped, %s - %s - %s",
					r.RequestURI, httpx.GetRemoteAddr(r), r.UserAgent())
				w.WriteHeader(http.StatusServiceUnavailable)
				return
			}

			cw := response.NewWithCodeResponseWriter(w)
			defer func() {
				if cw.Code == http.StatusServiceUnavailable {
					promise.Fail()
				} else {
					sheddingStat.IncrementPass()
					promise.Pass()
				}
			}()
			next.ServeHTTP(cw, r)
		})
	}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func (as *adaptiveShedder) shouldDrop() bool {
    //检查是否高负载(CPU使用率) 过热
	if as.systemOverloaded() || as.stillHot() {
        //高吞吐
		if as.highThru() {
			flying := atomic.LoadInt64(&as.flying)
			as.avgFlyingLock.Lock()
			avgFlying := as.avgFlying
			as.avgFlyingLock.Unlock()
			msg := fmt.Sprintf(
				"dropreq, cpu: %d, maxPass: %d, minRt: %.2f, hot: %t, flying: %d, avgFlying: %.2f",
				stat.CpuUsage(), as.maxPass(), as.minRt(), as.stillHot(), flying, avgFlying)
			logx.Error(msg)
			stat.Report(msg)
			return true
		}
	}

	return false
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

通过对系统状态的判定,监测系统是否处于高负载状况,如果超过阙值,则返回错误

Middlewares:
  # 是否开启
  Shedding: false
CpuThreshold: 900 # cpu百分比阈值,0-1000取值 900是90%
1
2
3
4

可以通过rest.WithPriority()设置某个接口为优先级接口,cpu阈值的计算方式为CpuThreshold+1000 >> 1