Grpc拦截器

gRPC 拦截器

服务端拦截器

grpc接口服务端拦截器,一般用来做一些预处理及后处理操作。如下,举两个常用的例子。

  1. 在微服务之间使用gRPC互相调用的时候,会传入一些公共的与业务不相关的元数据,这些数据就很适合在拦截器中实现。

如下服务端的拦截器将gRPC client传入的数据放入gRPC的context中,接口中就可以使用ctx.Value去获取该数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// MetaDataInterceptor get grpc server info, requestId/traceId/LogId
func MetaServerDataInterceptor() grpc.UnaryServerInterceptor {
// 拦截器函数签名
// @params ctx Grpc context
// @params req grpc request
// @params info grpc request info
// @params handler the grpc method
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

// do what you want to do
// get metadata from grpc client
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
// Set request info for context.
// define your key
for _, key := range []string{"requestId"} {
value := md.Get(key)
// ignore it if not exists.
if len(value) >= 1 {
// set value to context. you can use ctx.Value to get it from your grpc method
ctx = context.WithValue(ctx, key, value[0])
}
}
// next
return handler(ctx, req)
}
}
  1. 在实际的环境中,经常会需要在gRPC 接口之前之后做一些处理。比如,在开始之前记录时间,执行之后记录耗时操作;执行之后判断执行结果等等

如下所示,实现了一个记录接口耗时功能的拦截器,当然实际不会这么low。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// API time elas time get grpc server info
func APITimeInterceptor() grpc.UnaryServerInterceptor {
// 拦截器签名
return func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error) {

// do what you want to do
start := time.Now().UnixNano()
// do gRPC method
ret := handler(ctx, req)
// do what you want after the grpc method
fmt.Println(time.Now().UnixNano() - start)
return ret
}
}

服务端流式接口拦截器

在golang的gRPC中,普通接口与stream接口的拦截器,需要分别实现。以上的拦截器只用于非stream的接口,对于stream接口,以上拦截器是不生效的。
流式拦截器函数签名如下:

1
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

查看流式拦截器可知,stream的context是在ServerStream中的,因此stream 要传递context 需要继承ServerStream并覆盖context。如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Context
type WrappedStream struct {
grpc.ServerStream // serverStream interface
Ctx *context.Context // 定义ctx,覆盖ServerStream中的context
}

// Context override the context method and can config the context manually
func (c WrappedStream) Context() context.Context {
return *c.Ctx
}

// NewWrappedStream wrapper the grpc.ServerStream
func NewWrappedStream(s grpc.ServerStream, ctx *context.Context) grpc.ServerStream {
wrapper := &WrappedStream{
ServerStream: s,
Ctx: ctx,
}
stream := grpc.ServerStream(wrapper)
return stream
}

实现该封装之后,就可以将上层的context获取并将元数据写入context后,调用NewWrappedStream传入gRPC的接口调用中。如下所示

  1. 流式拦截器实现元数据的传递
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// stream method to get meta data
func MetaStreamServerInterceptor() grpc.StreamServerInterceptor {
// 函数签名
return func(
srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// 获取当前 grpc context
ctx := ss.Context()
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
// Set request info for context.
// define your key
for _, key := range []string{"requestId"} {
value := md.Get(key)
// ignore it if not exists.
if len(value) >= 1 {
// set value to context. you can use ctx.Value to get it from your grpc method
ctx = context.WithValue(ctx, key, value[0])
}
}
// set context to next
return handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx))
}
}

gRPC客户端拦截器

gRPC客户端拦截器是在调用gRPC接口之前与之后执行的操作。比如,元数据需要在请求接口之前塞入到metaData中(http2.0Header),才会传递到gRPC的服务端。
如下,将当前接口context中的数据放入header中传入服务端。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// request grpc service with requestId/traceId info.
func MetaClientDataInterceptor() grpc.UnaryClientInterceptor {
// 函数签名
return func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
// 获取当前header数据,没有则新建一个
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
for _, key := range keyNames {
value := ctx.Value(key)
if strValue, ok := value.(string); ok && strValue != "" {

md.Set(key, strValue)
}
}
// 将header写入
ctx = metadata.NewOutgoingContext(ctx, md)
// 执行调用
return invoker(ctx, method, req, resp, cc, opts...)
}
}

流式客户端拦截器

Stream client的实现也是比较简单的,与服务端不同的是,客户端的流式拦截器不需要封装一层,可以直接使用。
如下,同样实现了元数据传递到服务端的拦截器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream server
func MetaStreamClientInterceptor() grpc.StreamClientInterceptor {
// 函数签名
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer,
opts ...grpc.CallOption) (grpc.ClientStream, error) {

// 从context获取元数据
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
md = metadata.Pairs()
}
for _, key := range keyNames {
value := ctx.Value(key)
if strValue, ok := value.(string); ok && strValue != "" {
md.Set(key, strValue)
}
}
// set metadata to ctx
ctx = metadata.NewOutgoingContext(ctx, md)

clientStream, err := streamer(ctx, desc, cc, method, opts...)

return clientStream, err
}
}

总结

拦截器为gRPC的服务端/客户端复用公共模块提供了一种很简单方便的方法,只需要实现对应的拦截器函数,在服务端启动或者客户端连接的时候作为选项传入即可(自行搜索)。
需要注意的是,在Golang中,拦截器分为普通接口与流式接口的拦截器,需要分别实现。

  1. 流式服务端拦截器

gRPC中拦截器流式接口拦截器需要实现如下签名的函数,有兴趣可深入了解下。例子如上所示

1
2
3
4
5
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server.
// info contains all the information of this RPC the interceptor can operate on. And handler is the
// service method implementation. It is the responsibility of the interceptor to invoke handler to
// complete the RPC.
func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error

注意streamServer拦截器如果需要传递context,需要将ServerStream进行封装,覆盖Context 函数

  1. 普通服务端拦截器

普通方法的拦截器实现比较简单,实现如下签名函数

1
2
3
4
5
6
7
8
// @params ctx: grpc context
// @params req: the request params
// @params info: the grpc request info
// @params handler: the real grpc method
func(ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler) (resp interface{}, err error)
  1. 客户端普通拦截器

golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。

1
2
3
4
5
6
7
8
9
10
11
// @params method: the RPC name
// @params req: the request
// @params resp: the response
// @params cc: the ClientConn on which the RPC was invoked
// @params invoker: the invoker of grpc methor
// @params opts: the option
func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
)
  1. 客户端流失拦截器

实现如下签名的函数即可

1
2
3
4
5
6
// @params desc: contains a description of the stream
// @params cc: the ClientConn on which the RPC was invoked
// @params method: the RPC name
// @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it
// @params opts: the option
func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

以上即为Golang的拦截器实现,可以分为服务端与客户端的拦截器,两端分别有流式拦截器与普通接口拦截器,在使用的时候可根据自己的业务需求实现。

初识ElasticSearch

What is ElasticSearch

先搬一个官网的定义。

Elasticsearch is a real-time, distributed storage, search, and analytics engine

Elasticsearch 是一个实时分布式存储、搜索、分析的引擎。

要想了解它是什么,首先得看他能干什么,概念很清晰: 分布式存储/搜索/分析引擎。

  1. 看这些概念,咋一看,数据库也都可以做到。
  • 分布式存储 - 数据库也可以有主从集群模式
  • 搜索 - 数据库也可以用like %% 来查找

的确,这样做的确可以, mysql也支持全文检索。但是有个问题: like %% 是不走索引的,这就意味着: 数据量非常大的时候,我们的查询肯定是秒级的。

  1. 我还想提一个概念: 全文检索

类似搜索引擎,输入往往是多种多样的,不同的人有不同的表达方式,但实际 都是一个含义,数据库的准确性不高,效率低下,高并发下,数据库会被拖垮。

ElasticSearch 是专门做搜索的,就是为了在理解用户输入语义并高效搜索匹配度高的文档记录。

Elasticsearch基本概念

  • 近实时(NRT)

ElasticSearch是基于Lucene库的,Lucene数据只有刷新到磁盘,才可以被检索到,内存缓存中的数据只有刷新到磁盘才可以被检索。ElasticSearch默认是每秒刷新一次,也就是文档的变化会在一秒之后可见。因此近实时搜索。也可根据自己的需求设置刷新频率。

A Lucene index with new documents in the in-memory buffer

  • 集群(Cluster)

海量数据单机无法存储,就需要使用集群,将多个节点组织在一起,共同维护所有数据,共同提供索引和搜索功能。

  • 节点(node)

一个节点就是集群中的一个服务器,存储部分数据,参与索引与搜索。

  • 分片(shards & replicas)

一个索引可以存储超出单个结点硬件限制的大量数据,为了解决这个问题,Elasticsearch提供了将索引划分成多份的能力,这些份就叫做分片。为保证单点故障,一个分片会保存不止一份,可分为一个主分片(primary shard)与多个*复制分片(replica shard) *,复制分片的数量可动态调整,复制分片也可用来提升系统 的读性能。

  • 文档(Document)

一个文档是一个可被索引的基础信息单元。文档以JSON(Javascript Object Notation)格式来表示。

  • 索引(index)

一个索引就是一个拥有几分相似特征的文档的集合。

  • 索引类型(type)

索引类型是在一个索引中,不同类型的数据类型。一条文档中有(type)字段用来区分索引类型,es7.x以上取消同一个索引中存在不同索引类型的数据,也就是说,(_type)字段固定,默认为_doc。

如下,在7.x之前的ES可以在一个索引中创建不同索引类型的数据:

1
curl -XPOST localhost:9200/indexname/typename -H 'Content-Type:application/json' -d '{"data": 1234}'

ElasticSearch RestFul API

ES对外提供RestFul API来读写集群,设置集群,获取集群状态操作。

集群状态API

  • 集群状态

    GET /_cluster/health

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
curl http://localhost:9200/_cluster/health --user xx:xxxx
{
"cluster_name" : "es",
"status" : "green",
"timed_out" : false,
"number_of_nodes" : 2,
"number_of_data_nodes" : 2,
"active_primary_shards" : 1216,
"active_shards" : 2432,
"relocating_shards" : 0,
"initializing_shards" : 0,
"unassigned_shards" : 0,
"delayed_unassigned_shards" : 0,
"number_of_pending_tasks" : 0,
"number_of_in_flight_fetch" : 0,
"task_max_waiting_in_queue_millis" : 0,
"active_shards_percent_as_number" : 100.0
}
  • 集群节点列表
1
2
3
4
5
curl http://localhost:9200/_cat/nodes?v --user xxx:xxxx
ip heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
9.135.145.82 25 92 0 0.10 0.13 0.21 cdfhilmrstw * es-node0
9.135.91.111 21 99 0 0.01 0.07 0.08 cdfhilmrstw - es-node1
9.135.170.150 48 36 2 0.38 0.33 0.26 cdfhilmrstw - es-node2
  • 集群健康状态

结果与_cluster/health一致

1
2
3
4
curl --user elastic:4j243cNvO1770iCs http://10.1.1.45:9200/_cat/health?v

epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1620725415 09:30:15 es-nnx25yd7 green 11 8 4632 2316 0 0 0 0 - 100.0%
  • 节点分配资源状态
1
2
3
4
5
curl --user elastic:4j243cNvO1770iCs http://10.1.1.45:9200/_cat/allocation?v

shards disk.indices disk.used disk.avail disk.total disk.percent   host    ip     node
 9    38.8mb      9.1gb     8.6gb    17.7gb      51   192.168.2.114    192.168.2.114    node-1
 9    38.8mb      4.7gb     13gb    17.7gb      26    192.168.2.116    192.168.2.116    node-2

索引文档操作

  • 索引列表
1
curl http://localhost:9200/_cat/indices?pretty --user xx:xxxx
  • 查看索引的设置
1
curl http://localhost:9200/[index_name]/_settings
  • 查看索引映射
1
curl http://localhost:9200/[index_name]/_mapping  --user xx:xxx
  • 创建索引
1
2
3
4
5
6
7
curl -H "Content-Type: application/json" -XPUT localhost:9200/blogs -d '
{
"settings": {
"number_of_shards": 3, # 主分片
"number_of_replicas": 1 # 副本分片
}
}'
  1. 主分片在索引创建以后就固定了,不可更改,如要修改可重建索引,将数据reindex过去;
  2. 副本分片最大值是 n-1(n为节点个数),复制分片可随时修改个数
1
2
3
4
5
> curl -H "Content-Type: application/json" -XPUT localhost:9200/blogs/_settings -d '
> {
> "number_of_replicas": 2
> }'
>
  • reIndex操作
1
2
3
4
5
6
7
8
9
curl -H "Content-Type: application/json" -XPOST localhost:9200/_reindex -d '
{
"source": {
"index": "accesslog"
},
"dest": {
"index": "newlog"
}
}'
  • 删除索引
1
curl -H "Content-Type: application/json" -XDELETE localhost:9200/[indexname]

查询文档操作

1
POST http://localhost:9200/indexname/_search
  • 查看所有
1
curl -XPOST http://localhost:9200/indexname/_search -H "Content-Type:application/json" -d '{"query":{"match_all":{} } }'
  • 精确匹配(price=549的数据)
1
curl -XPOST http://localhost:9200/indexname/_search -H "Content-Type:application/json" -d '{"query":{"constant_score":{"filter":{"term":{"price":549} } } } }'
  • term query(title=”java”)
1
curl -XPOST http://localhost:9200/indexname/_search -H "Content-Type:application/json" -d '{"query":{"term":{"title":"java"} } }'
  • 分词查询
1
curl -XPOST http://localhost:9200/indexname/_search -H "Content-Type:application/json" -d '{"query":{"match":{"title":"Core Java"} } }'
  • 分词查询(全匹配)
1
2
curl -XPOST http://localhost:9200/indexname/_search -H "Content-Type:application/json" -d 
'{"query":{"match":{"title":{"query":"Core Java", "operator":"and"} } } }'

索引模板

  • dynamic template
1
2
3
4
5
6
7
8
9
10
"dynamic_templates": [
{
"my_template_name": {
... match conditions ...
"mapping": { ... } # match field use mappings
}
},
...
]
# The match conditions can include any of : match_mapping_type, match, match_pattern, unmatch, path_match, path_unmatch.
  1. match_mapping_type
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
put myIndex 
{
"mappings": {
"my_type": {
"dynamic_templates": [
{
"integers": { # template name
"match_mapping_type": "long", # all fileld value long type
"mapping": {
"type": "integer" # recognate it as integer
}
}
},
{
"string_not_analyzed": {
"match_mapping_type": "string", # match all string filed
"mapping": {
"type": "string",
"fields": {
"raw": {
"type": "string",
"index": "not_analyzed",
"ignore_above": 256
}
}
}
}
}
]
}
}
}
  1. match and unmatch

match和unmatch定义应用于filedname的pattern。

定义一个匹配所有以long_开头且不以_text结束的string类型的模板

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PUT my_index
{
"mappings": {
"my_type": {
"dynamic_templates": [
{
"longs_as_strings": {
"match_mapping_type": "string",
"match": "long_*",
"unmatch": "*_text",
"mapping": {
"type": "long"
}
}
}
]
}
}
}
  • example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
curl -XPOST http://10.1.1.12:9200/_template/default@template --user elastic:b6fBNAapGEcYz2dt -H "Content-Type:application/json" -d '{
"order" : 1,
"index_patterns" : [
"*"
],
"settings" : {
"index" : {
"max_result_window" : "65536",
"refresh_interval" : "30s",
"unassigned" : {
"node_left" : {
"delayed_timeout" : "5m"
}
},
"translog" : {
"sync_interval" : "5s",
"durability" : "async"
},
"number_of_replicas" : "1"
}
},
"mappings" : {
"dynamic_templates" : [
{
"message_full" : {
"mapping" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 2048,
"type" : "keyword"
}
}
},
"match" : "message_full"
}
},
{
"msg" : {
"mapping" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 2048,
"type" : "keyword"
}
}
},
"match_pattern": "regex",
"match" : "msg|pl_message|json"
}
},
{
"payload_data" : {
"mapping" : {
"type" : "text",
"fields" : {
"keyword" : {
"ignore_above" : 2048,
"type" : "keyword"
}
}
},
"match" : "*payload"
}
},
{
"message" : {
"mapping" : {
"type" : "text"
},
"match" : "message"
}
},
{
"strings" : {
"mapping" : {
"type" : "keyword"
},
"match_mapping_type" : "string"
}
}
]
},
"aliases" : { }
}'

快照

1
2
3
4
5
6
7
8
9
# register a snapshot repository
PUT /_snapshot/my_fs_backup
{
"type": "fs",
"settings": {
"location": "/opt/backup_es",
"compress": true
}
}

location:my_fs_backup_location 路径必须先在elasticsearch.yaml中配置path.repo

1
path.repo: /opt/backup_es
location Location of the snapshots. Mandatory.
compress Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to true.
chunk_size Big files can be broken down into chunks during snapshotting if needed. Specify the chunk size as a value and unit, for example: 1GB, 10MB, 5KB, 500B. Defaults to null (unlimited chunk size).
max_restore_bytes_per_sec Throttles per node restore rate. Defaults to 40mb per second.
max_snapshot_bytes_per_sec Throttles per node snapshot rate. Defaults to 40mb per second.
readonly Makes repository read-only. Defaults to false.

快照策略

SLM

elastic设置密码

elasticsearch.yml增加如下配置

1
2
3
xpack.security.enabled: true
xpack.license.self_generated.type: basic
xpack.security.transport.ssl.enabled: true

重新启动es, 执行

1
bin/elasticsearch-setup-passwords interactive

这里需要为4个用户分别设置密码,elastic, kibana, logstash_system,beats_system,交互输入密码。

修改密码:

1
curl -H "Content-Type:application/json" -XPOST -u elastic 'http://127.0.0.1:9200/_xpack/security/user/elastic/_password' -d '{ "password" : "123456" }'

索引选项

index.refresh_interval

数据索引后并不会马上搜索到,需要刷新后才能被搜索的,这个选项设置索引后多久会被搜索到。

index.translog

  • sync_interval
  • durability

Why yellow

  • 多数据节点故障
  • 为索引使用损坏的或红色的分区
  • 高 JVM 内存压力或 CPU 利用率
  • 磁盘空间不足

Fix yellow

  1. 列出未分配的分区
1
curl -XGET 'localhost:9200/_cat/shards?h=index,shard,prirep,state,unassigned.reason' | grep UNASSIGNED

输出:

1
2
3
4
xxxxx                             0 r UNASSIGNED INDEX_CREATED
yyyyy 0 r UNASSIGNED INDEX_CREATED
zzzzz 0 r UNASSIGNED INDEX_CREATED
rrrrr 0 r UNASSIGNED INDEX_CREATED

展示出所有未分配的分片的列表

  1. 检索为什么未分配
1
curl -XGET 'localhost:9200/_cluster/allocation/explain?pretty' -H 'Content-Type:application/json' -d'{"index": "xxxxx", "shard": 0, "primary":false}'

输出:(未记录输出)

会给出集群中所有节点不能分配的原因。

  1. 解决

如果是磁盘空间不足,删除不必要的索引。对于其他原因,可根据情况解决不能分配的原因。比如下面几个常见的原因。

a. cluster.max_shards_per_node默认为1000,节点分片已经达到最大。

b. 磁盘空间达到配置的阈值,比如磁盘已经达到80%,不会继续分配分片。

c. 分片设置的节点必须是hot节点。

可通过如下接口查看当前磁盘分配配置:

1
curl -XGET _cluster/settings?include_defaults=true&flat_settings=true&pretty

输出(输出太多截取一部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"persistent" : {
"cluster.routing.allocation.disk.watermark.flood_stage" : "95%",
"cluster.routing.allocation.disk.watermark.high" : "90%",
"cluster.routing.allocation.disk.watermark.low" : "85%"
},
"transient" : {
"cluster.max_shards_per_node" : "10000",
"cluster.routing.allocation.disk.watermark.flood_stage" : "95%",
"cluster.routing.allocation.disk.watermark.high" : "90%",
"cluster.routing.allocation.disk.watermark.low" : "85%"
},
.....

索引生存周期(ILM)

适用于单索引并不断增长,可设置ILM rollover,根据大小或者文档条数拆分.

对于按天索引,可配置删除阶段规则.

  1. 创建ILM策略(hot/warm/cold/delete)

  2. 创建索引模板,指定ILM的范围

  3. 创建rollover的索引,名称末尾要是数字,这样rollover就会+1, 如:carlshi-00001;配置is_write_index选项

  4. 原索引写入数据

For Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 创建索引模板
PUT /_template/carl_template
{
"index_patterns": [ # 匹配的索引名称
"carl-*"
],
"settings": {
"refresh_interval": "30s",
"number_of_shards": "1",
"number_of_replicas": "0"
},
"mappings": { # mapping
"properties": {
"name": {
"type": "keyword"
}
}
}
}

创建索引:

1
2
3
4
5
6
7
8
9
# 创建第一个索引
PUT /carlshi-000001
{
"aliases": {
"carlshi-index": { # 索引alias,写入carlshi-index的都会写入carlshi-00001
"is_write_index": true
}
}
}

elasticsearch docker

直接运行elasticsearch,会自动拉去镜像并执行;

1
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.5.1 -v /usr/share/elasticsearch/data:/usr/share/elasticsearch/data

运行成功后,执行curl,获取基本信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
curl localhost:9200
{
"name" : "be856c56d8bd",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "bsnwunE2SnWcBIqoxbgnUw",
"version" : {
"number" : "7.5.1",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "3ae9ac9a93c95bd0cdc054951cf95d88e1e18d96",
"build_date" : "2019-12-16T22:57:37.835892Z",
"build_snapshot" : false,
"lucene_version" : "8.3.0",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

小结

ElasticSearch是一款强大的全文检索工具,他提供REST API使得使用ElasticSearch非常简单,对数据做了很强的高可用,也可根据自己的需求配置不同级别的高可用、高性能全文检索工具。

本篇主要讲解对ElasticSearch的常用模块做了简单的介绍,索引的基本属性基本操作(增删改查),动态索引模式模板,快照备份,索引生存周期;还记录了集群黄色的排查方向。以后逐步深入各个模块的配置甚至内部实现原理。

日志处理之logstash

Introduction

Logstash是一个开源数据收集引擎,具有实时管道功能。Logstash将来自不同数据源的数据统一搜集起来,并根据需求将数据标准化输出到你所选择的目的地。如下图所示。

img

Input/Filter/Output

Logstash可以从多个数据源获取数据,并对其进行处理、转换,最后将其发送到不同类型的“存储”

输入

采集各种样式、大小和来源的数据

分布式系统中,数据往往是以各种各样的形式(结构化、非结构话)存在于不同的节点中。Logstash 支持不同数据源的选择 ,日志、报表、数据库的内容等等。可以在同一时间从众多常用来源捕捉事件。

  • 文件类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
input{
file{
# path属性接受的参数是一个数组,其含义是标明需要读取的文件位置
path => [‘pathA’,‘pathB’]

# 表示多就去path路径下查看是够有新的文件产生。默认是15秒检查一次。
discover_interval => 15

# 排除那些文件,也就是不去读取那些文件
exclude => [‘fileName1’,‘fileNmae2’]

# 被监听的文件多久没更新后断开连接不在监听,默认是一个小时。
close_older => 3600

# 在每次检查文件列 表的时候, 如果一个文件的最后 修改时间 超过这个值, 就忽略这个文件。 默认一天。
ignore_older => 86400

# logstash 每隔多久检查一次被监听文件状态(是否有更新), 默认是 1 秒。
stat_interval => 1

#sincedb记录数据上一次的读取位置的一个index
sincedb_path => ’$HOME/. sincedb‘

#logstash 从什么 位置开始读取文件数据, 默认是结束位置 也可以设置为:beginning 从头开始
start_position => ‘beginning’

# 注意:这里需要提醒大家的是,如果你需要每次都从开始读取文件的话,只设置start_position => beginning是没有用的,你可以选择sincedb_path 定义为 /dev/null
}

}
  • 数据库类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
input{
jdbc{
# jdbc sql server 驱动,各个数据库都有对应的驱动,需自己下载
jdbc_driver_library => "/etc/logstash/driver.d/sqljdbc_2.0/enu/sqljdbc4.jar"
#jdbc class 不同数据库有不同的 class 配置
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
#配置数据库连接 ip 和端口,以及数据库
jdbc_connection_string => "jdbc:sqlserver://xxxxxx:1433;databaseName=test_db"
#配置数据库用户名
jdbc_user =>
#配置数据库密码
jdbc_password =>

# 上面这些主要配置数据库java驱动,账号配置
# 定时器 多久执行一次SQL,默认是一分钟
# schedule => 分 时 天 月 年
# schedule => * 22 * * * 表示每天22点执行一次
schedule => "* * * * *"
# 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
clean_run => false
# 是否需要记录某个column 的值,如果 record_last_run 为真,可以自定义我们需要表的字段名称,
#此时该参数就要为 true. 否则默认 track 的是 timestamp 的值.
use_column_value => true
#如果 use_column_value 为真,需配置此参数. 这个参数就是数据库给出的一个字段名称。当然该字段必须是递增的,可以是 数据库的数据时间这类的
tracking_column => create_time
#是否记录上次执行结果, 如果为真,将会把上次执行到的 tracking_column 字段的值记录下来,保存到 last_run_metadata_path 指定的文件中
record_last_run => true
# 我们只需要在 SQL 语句中 WHERE MY_ID > :last_sql_value 即可. 其中 :last_sql_value 取得就是该文件中的值
last_run_metadata_path => "/etc/logstash/run_metadata.d/my_info"
# 是否将字段名称转小写。
# 这里有个小的提示,如果你之前就处理过一次数据,并且在Kibana中有对应的搜索需求的话,还是改为true,
# 因为默认是true,并且Kibana是大小写区分的。准确的说应该是ES大小写区分
lowercase_column_names => false
# 你的SQL的位置,当然,你的SQL也可以直接写在这里。
# statement => SELECT * FROM tabeName t WHERE t.creat_time > :last_sql_value
statement_filepath => "/etc/logstash/statement_file.d/my_info.sql"
# 数据类型,标明数据来源,es索引的时候可以建立不同的额索引
type => "my_info"
}
# 注意:外在的SQL文件就是一个文本文件就可以了,还有需要注意的是,一个jdbc{}插件就只能处理一个SQL语句,
# 如果你有多个SQL需要处理的话,只能在重新建立一个jdbc{}插件。
}
  • beats

主要是接受filebeats的数据导入

1
2
3
4
5
6
7
8
input {
beats {
# 接受数据端口
port => 5044
# 数据类型
type => "logs"
}
}

过滤器

实时解析和转换数据

数据从源传输到存储库的过程中,需要对不同的数据进行不同的存储,Logstash 过滤器能够解析每条记录,识别每条数据的字段内容,并将它们转换成自定义数据,以便进行处理分析计算。

Logstash 动态地转换和解析数据,支持各种格式或复杂度数据的解析:

  • 利用 Grok 从非结构化数据中派生出结构
  • 从 IP 地址破译出地理坐标
  • 将 PII 数据匿名化,完全排除敏感字段
  • 整体处理不受数据源、格式或架构的影响

输出

尽管 ES是logstash的常用输出方向,能够为我们的搜索和分析带来无限可能,但它并非唯一选择。

Logstash 提供众多输出选择,您可以将数据发送到您要指定的地方,并且能够灵活地解锁众多下游用例。

img

Install && config

  • 安装

安装比较简单,官网直接有现成的二进制包,下载地址: https://artifacts.elastic.co/downloads/logstash/logstash-7.10.1-linux-x86_64.tar.gz

安装也比较简单,解压设置path即可使用。

本人经常使用,就写了个安装elk的脚本,需要的可以拿去使用:https://github.com/shiguofu2012/scripts/blob/master/install_elk.sh。

  • 配置intput/output

Logstash配置有两个必需的元素,输入和输出,以及一个可选过滤器。输入插件从数据源那里消费数据,过滤器插件根据你的期望修改数据,输出插件将数据写入目的地。

img

  1. 接下来,允许Logstash最基本的管道,例如:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
[root@VM-145-82-centos ~]# logstash -e 'input { stdin {} } output { stdout {} }'
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2021-01-07T22:15:40,409][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"7.9.3", "jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc Java HotSpot(TM) 64-Bit Server VM 25.261-b12 on 1.8.0_261-b12 +indy +jit [linux-x86_64]"}
[2021-01-07T22:15:40,803][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2021-01-07T22:15:42,097][INFO ][org.reflections.Reflections] Reflections took 39 ms to scan 1 urls, producing 22 keys and 45 values
[2021-01-07T22:15:43,158][INFO ][logstash.javapipeline ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["config string"], :thread=>"#<Thread:0x41654bea run>"}
[2021-01-07T22:15:43,809][INFO ][logstash.javapipeline ][main] Pipeline Java execution initialization time {"seconds"=>0.64}
[2021-01-07T22:15:43,866][INFO ][logstash.javapipeline ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2021-01-07T22:15:43,914][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2021-01-07T22:15:44,235][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
{
"host" => "VM-145-82-centos",
"message" => "",
"@timestamp" => 2021-01-07T14:15:43.902Z,
"@version" => "1"
}
hello
{
"host" => "VM-145-82-centos",
"message" => "hello",
"@timestamp" => 2021-01-07T14:15:47.996Z,
"@version" => "1"
}

{
"host" => "VM-145-82-centos",
"message" => "",
"@timestamp" => 2021-01-07T14:15:50.766Z,
"@version" => "1"
}

从标准输入获取数据,输出到标准输出。

  1. input 从filebeat获取数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
input {
beats {
host => "0.0.0.0" # 默认是127.0.0.1 只能本级访问
port => 5044
}
}
# output 索引至es
output {
elasticsearch {
hosts => ["localhost:9200"] # es地址
user => "xxxx" # 用户名
password => "xxxx" # 密码
index => "test-ap-%{+YYYY.MM.dd}" # 建立的索引,这里默认每天建一个索引
}
}

总体来讲,input/output是比较容易配置的,关键是对数据进行格式化。

  • filter
正则匹配

grok 匹配非格式化字段,提取字段格式化数据,强大的文本解析工具,支持正则表达式

1
2
3
4
5
6
7
8
9
10
11
grok {
match => { "[message]" => "%{TIMESTAMP_ISO8601:_timestamp} %{LOGLEVEL:level} %{DATA:stack}" }
}
# 解析失败的处理
if "_grokparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
}
ip解析
1
2
3
4
5
6
filter {
geoip {
source => "ip"
fields => ["city_name", "timezone"] # 选择解析的字段
}
}

解析出来的数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
"ip" => "183.60.92.253",
"@version" => "1",
"@timestamp" => "2014-08-07T10:32:55.610Z",
"host" => "raochenlindeMacBook-Air.local",
"geoip" => {
"ip" => "183.60.92.253",
"country_code2" => "CN",
"country_code3" => "CHN",
"country_name" => "China",
"continent_code" => "AS",
"region_name" => "30",
"city_name" => "Guangzhou",
"latitude" => 23.11670000000001,
"longitude" => 113.25,
"timezone" => "Asia/Chongqing",
"real_region_name" => "Guangdong",
"location" => [
[0] 113.25,
[1] 23.11670000000001
]
}
}
字段增删改
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
filter {
mutate {
remove_field => ["ecs", "input", "agent", "tags", "@version", "@metadata"] # 移除字段
rename => ["host", "my_host"] # 重命名
rename => ["kubernetes", "my_k8s"] # 重命名
remove_field => ["[host][mac]", "[my_host][containerized]", "[my_host][os]", "[my_host][id]", "[my_host][name]", "[my_host][architecture]"] # 移除字段,使用已经重命名的字段
add_field => { "mytype" => "type" } # 增加字段

update => { "sample" => "My new message" } # 更新字段内容,如果字段不存在,不会新建
replace => { "message" => "%{source_host}: My new message" } # 与 update 功能相同,区别在于如果字段不存在则会新建字段
convert => ["request_time", "float"] # 数据类型转换
uppercase => [ "fieldname" ] # 大写转换
lowercase => [ "fieldname" ]

# 提供正则表达式替换
gsub => [
# replace all forward slashes with underscore
"fieldname", "/", "_",
# replace backslashes, question marks, hashes, and minuses
# with a dot "."
"fieldname2", "[\\?#-]", "."
]
}
}
条件判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
filter {
# 条件判断,字段my_k8s是否存在; 并且日志路径匹配
if ![my_k8s] and [log][file][path] =~ "/data/project_log/[\w-]+/[\w-\\.]+.log" {
mutate {
split => ["[log][file][path]", "/"]
# split操作 /data/project_log/app1/app.log => ["", data, project_log, app1, app.log]
add_field => { "[kubernets][labels][app]" => "%{[log][file][path][3]}" }
}
}
# 字段操作放在mutate中
mutate {
remove_field => [ "log" ]
}
}
json
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
filter {
# 检查是否是json格式
if [message] =~ "^\{.*\}[\s\S]*$" {
json {
source => "[message]"
target => "jsoncontent"
}
# json 数据失败
if "_jsonparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
}
}

Example

这里介绍一个曾经搭建的ELK日志系统。

结构比较简单,kubetnets中filebeat damonSet方式运行,搜集所有container 标准输出的日志,并传入logstash中,logstash将数据导入elasticsearch。结构图如下所示:

image-20210111133718737

下面开始logstash的配置:

input比较简单,使用filebeat搜集日志,传入logstash

1
2
3
4
5
6
input {
beats {
host => "0.0.0.0"
port => 5044
}
}

output增加了几个条件判断,根据不同的字段日志类型,索引到不同的es索引中;如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
output {
# k8s app 索引到对应的app中
if ([kubernetes][labels][app]) {
if ([type] == "app") {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-%{+YYYY.MM.dd}"
}
# 根据type区分索引
} else if ([type] == "user") {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-[type]%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{[kubernetes][labels][app]}-%{+YYYY.MM.dd}"
}
}

# 不存在k8s app字段
} else if ([type] == "user") {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-%{+YYYY.MM.dd}"
}
} else if ([type] == "app") {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["localhost:9200"]
index => "default-v1.0.0"
}
}
}

filter 配置,不同的日志格式,输出格式化的数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
filter {
mutate {
# 移除filebeat发送的多余字段
remove_field => ["ecs", "input", "agent", "tags", "@version", "@metadata"]
remove_field => ["[host][mac]", "[host][containerized]", "[host][os]", "[host][id]", "[host][name]", "[host][architecture]"]
}
if ![kubernetes] and [log][file][path] =~ "/data/app_log/[\w-]+/[\w\.-]+.log" {
mutate {
split => ["[log][file][path]", "/"]
add_field => { "[kubernetes][labels][app]" => "%{[log][file][path][3]}-host" }
}
}
mutate {
remove_field => [ "log" ]
}

if [message] =~ "^\{.*\}[\s\S]*$" {
# json格式处理
json {
source => "[message]"
}
if "_jsonparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}

if ([time]) {
date {
match => ["time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}

mutate {
remove_field => ["time"]
}
}

# docker 日志格式
if [log] =~ "^\{.*\}[\s\S]*$" {
json {
source => "[log]"
}

mutate {
remove_field => ["log"]
}

if ([start_time]) {
date {
match => ["start_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "start_time"
}
}

if ([app_start_time]) {
date {
match => ["app_start_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "app_start_time"
}
}

if ([end_time]) {
date {
match => ["end_time", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "end_time"
}
}

if ([timestamp]) {
date {
match => ["timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}
}
} else {
if ([timestamp]) {
date {
match => ["timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}
}
}
} else {
# 匹配日志格式
grok {
match => { "[message]" => "%{TIMESTAMP_ISO8601:_timestamp} %{LOGLEVEL:level} %{DATA:stack} - (?<message>(.|\r|\n|\t)*)" }
}
# 匹配格式失败处理
if "_grokparsefailure" in [tags] {
mutate {
rename => ["message", "msg"]
remove_field => ["tags"]
}
}
# 解析时间格式
date {
match => ["_timestamp", "ISO8601", "UNIX", "UNIX_MS", "TAI64N"]
target => "@timestamp"
}

mutate {
remove_field => ["_timestamp"]
}
}

mutate {
remove_field => ["message"]
}
}

总结

总之 ,logstash具备强大的功能,将不同数据源的数据经过清洗格式化,转化为结构化的数据,存储到不同的存储单元。

gRPC tracing

介绍

gRPC

gRPC是什么可以用官网的一句话来概括

A high-performance, open-source universal RPC framework

所谓RPC(remote procedure call 远程过程调用)框架实际是提供了一套机制,使得应用程序之间可以进行通信,而且也遵从server/client模型。使用的时候客户端调用server端提供的接口就像是调用本地的函数一样。如下图所示就是一个典型的RPC结构图。

img

gRPC vs. Restful API

既然是server/client模型,那么我们直接用restful api不是也可以满足吗,为什么还需要RPC呢?

gRPC和restful API都提供了一套通信机制,用于server/client模型通信,而且它们都使用http作为底层的传输协议(严格地说, gRPC使用的http2.0,而restful api则不一定)。不过gRPC还是有些特有的优势,如下:

  • gRPC可以通过protobuf来定义接口,从而可以有更加严格的接口约束条件。
  • 另外,通过protobuf可以将数据序列化为二进制编码,这会大幅减少需要传输的数据量,从而大幅提高性能。
  • gRPC可以方便地支持流式通信(理论上通过http2.0就可以使用streaming模式, 但是通常web服务的restful api似乎很少这么用,通常的流式数据应用如视频流,一般都会使用专门的协议如HLS,RTMP等,这些就不是我们通常web服务了,而是有专门的服务器应用。)

Tracing

微服务遍地都是,一个功能,一个接口都可能是一个微服务,微服务之间的调用混乱,无法追踪,很难找出瓶颈点,因此迫切需要一种方法来追踪服务之间的调用链路。

  • MetaData

Metadata 可以理解为一个 HTTP 请求的 Header(它的底层实现就是 HTTP/2 的 Header),用户可以通过访问和修改每个 gRPC Call 的 Metadata 来传递额外的信息:比如认证信息,比如用于追踪的 Request ID。

  • interceptor

Interceptor 有点类似于我们平时常用的 HTTP Middleware,不同的是它可以用在 Client 端和 Server 端。比如在收到请求之后输出日志,在请求出现错误的时候输出错误信息,比如获取请求中设置的 Request ID。

  • Golang实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs.
type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error

// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC
// and it is the responsibility of the interceptor to call it.
// This is an EXPERIMENTAL API.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal
// execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the
// status package, or else gRPC will use codes.Unknown as the status code and err.Error() as
// the status message of the RPC.
type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)

// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info
// contains all the information of this RPC the interceptor can operate on. And handler is the wrapper
// of the service method implementation. It is the responsibility of the interceptor to invoke handler
// to complete the RPC.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

Golang 的实现是把 Metadata 塞在了 context 里面,只需要使用 metadata.FromOutgoingContext(ctx)metadata.FromIncomingContext(ctx) 就能够访问本次请求的 Metadata。概念清楚之后代码应该非常好写了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
const requestIdKey = "requestId"

// client 请求RPC增加MetaData拦截器
func RequestIDClientInterceptor() grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string, req, resp interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption,
) (err error) {
// 获取请求的MetaData
md, ok := metadata.FromOutgoingContext(ctx)
if !ok {
// 未添加则创建
md = metadata.Pairs()
}

value := ctx.Value(requestIdKey)
if requestID, ok := value.(string); ok && requestID != "" {
// md 不区分大小写,内部会转换为小写
md.Set(requestIdKey, requestID)
}
return invoker(metadata.NewOutgoingContext(ctx, md), method, req, resp, cc, opts...)
}
}

// server端获取MetaData数据
func RequestIDServerInterceptor() grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler,
) (resp interface{}, err error) {
// 获取请求过来的MetaData
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
md = metadata.Pairs()
}
// 后去请求的requstId,并设置到当前context中
requestIDs := md[requestIdKey]
if len(requestIDs) >= 1 {
ctx = context.WithValue(ctx, requestIdKey, requestIDs[0])
}
return handler(ctx, req)
}
}

日志切割工具 logrotate 配置

1. 安装

Linux一般是默认自带logrotate的,如果没有可以使用yum/apt安装

apt/yum 安装

1
2
3
4
5
6
[root@VM-145-82-centos ~]# yum install logrotate
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
Package logrotate-3.8.6-14.tl2.x86_64 already installed and latest version
Nothing to do
[root@VM-145-82-centos ~]#
1
2
3
4
5
6
root@VM-0-15-ubuntu:[10:59:47]:~# apt-get install logrotate
Reading package lists... Done
Building dependency tree
Reading state information... Done
logrotate is already the newest version (3.8.7-2ubuntu2.16.04.2).
0 upgraded, 0 newly installed, 0 to remove and 385 not upgraded.

源码安装

github地址: https://github.com/logrotate/logrotate

按照github安装

2. 配置

logrotate是利用系统crontab定时执行的,在目录/etc/cron.daily中有个logrotate的脚本。如果需要,可以在cron.hourly,也可以在/etc/crontab中增加自己的配置。

这些配置都是独立的,结合自己的配置以及服务的日志量来自定义达到最优配置。

1
2
3
4
5
6
7
8
#!/bin/sh

/usr/sbin/logrotate -s /var/lib/logrotate/logrotate.status /etc/logrotate.conf
EXITVALUE=$?
if [ $EXITVALUE != 0 ]; then
/usr/bin/logger -t logrotate "ALERT exited abnormally with [$EXITVALUE]"
fi
exit 0

配置文件主要存放在目录

1
2
[root@VM-145-82-centos ~]# ls /etc/logrotate.d/
conman cron.30m iptraf-ng mgetty mongodb mysql psacct syslog yum

一般安装会添加常用组件的配置,在生产环境中,我们会自定义一些配置。

如下是本人常用的配置:

1
2
3
4
5
6
7
8
9
10
11
/data/log/*.log
{
daily
notifempty
copytruncate
compress
rotate 60
missingok
dateext
dateformat -%s
}
配置参数 说明
monthly 日志文件将按月轮循。其它可用值为’daily’,’weekly’或者’yearly’。
rotate 5 一次将存储5个归档日志。对于第六个归档,时间最久的归档将被删除。
compress 在轮循任务完成后,已轮循的归档将使用gzip进行压缩。
delaycompress 总是与compress选项一起用,delaycompress选项指示logrotate不要将最近的归档压缩,压缩将在下一次轮循周期进行。这在你或任何软件仍然需要读取最新归档时很有用。
missingok 在日志轮循期间,任何错误将被忽略,例如“文件无法找到”之类的错误。
notifempty 如果日志文件为空,轮循不会进行。
create 644 root root 以指定的权限创建全新的日志文件,同时logrotate也会重命名原始日志文件。
size 日志文件大小的配置,如果没达到这个大小,将不会压缩
dateext 压缩文件带上日期,默认会使用编号(log.log.1.gz),该选项会是每次压缩都带上日期,如 log.log-2020
dateformat 日期格式 支持 %Y%m%d(年月日) %s(时间戳)
postrotate/endscript 在所有其它指令完成后,postrotate和endscript里面指定的命令将被执行。在这种情况下,rsyslogd 进程将立即再次读取其配置并继续运行。

3. 执行频度定制

logrotate是在cron中执行的,因此要自定义执行频度,可以增加crontab配置

比如,我们的服务需要每半小时压缩一次 hourly是不满足我们的需求,就需要在crontab中增加一条记录

1
2
3
4
[root@VM-145-82-centos /etc/logrotate.d/cron.30m]# crontab -e

# rotate nginx log every 30min
*/30 * * * * /usr/sbin/logrotate /etc/logrotate.d/cron.30m/* -f

可以将30min执行的所有配置文件放到一个目录,执行30min命令。

4. 手动执行

配置文件是否配置成功,执行后是什么效果。logrotate提供测试的功能。

1
2
3
4
5
6
7
8
9
10
11
[root@VM-145-82-centos /etc/logrotate.d/cron.30m]# logrotate -d /etc/logrotate.d/cron.30m/*
reading config file /etc/logrotate.d/cron.30m/nginx
Allocating hash table for state file, size 15360 B

Handling 1 logs

rotating pattern: /var/log/nginx/*log after 1 days (10 rotations)
empty log files are not rotated, old logs are removed
considering log /var/log/nginx/access.log
log does not need rotating (log is empty)considering log /var/log/nginx/error.log
log does not need rotating (log is empty)not running postrotate script, since no logs were rotated

执行后,会输出文件怎么变更,压缩重命名等。比如上面会提示 “old logs are removed” “10 rotate” 等记录。

总结

logrotate工具对于防止因庞大的日志文件而耗尽存储空间是十分有用的。配置完毕后,进程是全自动的,可以长时间在不需要人为干预下运行。可以根据需求及日志量定制自己的日志切割规则。

Golang context 源码分析

一般接触新东西都会有三问(what/why/how),我们也从这几个角度看下golang context。

WHAT IS CONTEXT

Goroutine和Channel是go语言的特色,并发编程会使用协程来处理工作,通道用来协程之间的通信。主协程创建子协程,子协程再创建子协程,就会形成一个协程树状结构,协程之间可以采用通道来进行通信。
每个Goroutine在执行之前,都要先知道程序当前的执行状态,通常将这些执行状态都会封装Context变量中,传递给要执行的Goroutine中。比如在网络编程中,处理一个网络Request时,我们可能需要开启不同的Goroutine来获取数据与逻辑处理。而这些Goroutine可能需要共享Request的一些信息;同时当Request异常、超时或者被取消的时候,所有从这个Request创建的所有Goroutine都应该被结束。

WHY CONTEXT

上面我们说过,由于异常、超时或者被取消的情况,都应该结束该Request创建的所有Goroutine。假如不用context,我们看怎么去控制协程树。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package services

import (
"fmt"
"time"
)

func StartWork() {
data := make(chan int, 10)
done := make(chan struct{}) // 使用一个chan来控制协程的退出

defer close(data)
// consumer
go func() {
for {
select {
case <-done:
fmt.Println("child process interrupt...")
return
default:
fmt.Printf("send message: %d\n", <-data)
time.Sleep(time.Second * 1)
}
}
}()

// producer
for i := 0; i < 10; i++ {
data <- i
}
time.Sleep(5 * time.Second)
// 退出协程
close(done)
time.Sleep(1 * time.Second)
fmt.Println("main process exit!")
}

子协程运行着任务,如果主协程需要在某个时刻发送消息通知子协程中断任务退出,那么就可以让子协程监听这个done channel,一旦主协程关闭done channel,那么子协程就可以推出了,这样就实现了主协程通知子协程的需求。这种方式运行良好,但是这也是有限的。
考虑这种情况:如果主协程中有多个任务1, 2, …n,主协程对这些任务有时间限制;而任务1又有多个子任务1, 2, …,k, 任务1对这些子任务也有自己的时间控制,那么这些子任务既要感知主协程的取消信号,也需要感知任务1的取消信号。
如果使用done channel的用法,这个时候需要定义两个done channel,子任务们需要同时监听这两个done channel,这样也可以正常work。但是如果层级更深,如果这些子任务还有子任务,那么使用done channel的方式将会变得非常繁琐且混乱。
这个时候 context出场了。

context interface

1
2
3
4
5
6
type Context interface {
Deadline() (deadline time.Time, ok bool)
Done() <-chan struct{}
Err() error
Value(key interface{}) interface{}
}

context包含四个方法

  • Deadline 返回绑定当前context的任务执行的截止时间,如果没有截止时间,则ok==false
  • Done 该函数返回一个只读的chan 数据是空结构体,类似我们上面的done,控制协程的退出
  • Err 如果协程退出,该函数返回退出的原因,被取消或者执行超时等错误
  • Value 获取context中传递的值

可以看到Done方法返回的chan正是用来传递结束信号以抢占并中断当前任务的;Deadline方法用来表示gorotine是否在指定时间被取消;Err方法是用来解释goroutine被取消的原因;而Value则用于获取特定于当前任务的数据。
而context所包含的额外信息键值对是如何存储的呢?其实可以想象一颗树,树的每个节点可能携带一组键值对,如果当前节点上无法找到key所对应的值,就会向上去父节点里找,直到根节点,具体后面会说到。

下面我们看下context的两个比较常用的实现,valueCtx/cancelCtx

valueCtx

1
2
3
4
type valueCtx struct {
Context
key, val interface{}
}

结构比较简单就是增加了key/val字段存储键值对
WithValue用以向其中存储键值对

1
2
3
4
5
6
7
8
9
func WithValue(parent Context, key, val interface{}) Context {
if key == nil {
panic("nil key")
}
if !reflect.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

这里添加键值对不是直接在原context结构体上直接添加,而是重新创建一个新的valueCtx节点,作为原ctx的子节点,并将键值对添加在子节点上,由此形成一条链表。获取value就是从这条链表上尾部向前搜寻(代码如下):

1
2
3
4
5
6
func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

valueCtx的增加值与获取值的过程如下:
With就是往链表的尾部增加节点,value就是从尾部开始获取对应的值,找到就退出;否则找到头部返回空.

enter image description here

cancelCtx

1
2
3
4
5
6
7
8
9
10
11
12
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

type cancelCtx struct {
Context
mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

cancelCtx是一个可以取消的 Context,实现了 canceler 接口。它直接将接口 Context 作为它的一个匿名字段,这样,它就可以被看成一个 Context。done是一个空结构体类型的channel,用来传递关闭信号,在协程中一般结合select来监听父协程退出信号;children是一个map,存储了所有可取消context的子节点,这样任意层级的context取消,都会给所有子context发送取消信号;err用于存储错误信息表示任务结束的原因。
先看下Done方法:

1
2
3
4
5
6
7
8
9
func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

c.done懒汉式创建,调用Done方法才会创建。返回一个只读的chan,一般结合select监听。一旦取消,立马可读。
重点看下cancel方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
// 必须要传 err
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // 已经被其他协程取消
}
// 给 err 字段赋值
c.err = err
// 关闭 channel,通知其他协程
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}

// 遍历它的所有子节点
for child := range c.children {
// 递归地取消所有子节点
child.cancel(false, err)
}
// 将子节点置空
c.children = nil
c.mu.Unlock()

if removeFromParent {
// 从父节点中移除自己
removeChild(c.Context, c)
}
}

总体来看,cancel() 方法的功能就是关闭 channel:c.done;递归地取消所有的子节点;将自己从父节点树中摘掉。通过关闭 channel,goroutine 这边接收取消信号的方式就是 select 语句中的读 ctx.Done 可读,执行相应的退出函数。
这里有个比较巧妙的地方,调用子节点 cancel 方法的时候,传入的第一个参数是 false,最终子节点是没有调用removeChild,把自己从父节点移除。
移除的操作很简单,找到最近的可取消的祖先节点,将自己从其map中删除。最关键的一行:delete(p.children, child)。

1
2
3
4
5
6
7
8
9
10
11
12
13
func removeChild(parent Context, child canceler) {
// 查找最近的可取消节点
p, ok := parentCancelCtx(parent)
if !ok {
return
}
p.mu.Lock()
if p.children != nil {
// 从最近的祖先节点的map中移除自己
delete(p.children, child)
}
p.mu.Unlock()
}

那什么时候需要移除,什么时候不需要移除呢?
我们先来看下创建一个可取消的context。
WithCancel函数创建一个可取消的context,即cancelCtx类型的context,传入一个父context,返回context和CancelFunc,调用CancelFunc即可触发cancel操作。直接看源码:

1
2
3
4
5
6
7
8
9
10
11
type CancelFunc func()

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) } // 这里cancel传入了true
}

func newCancelCtx(parent Context) cancelCtx {
return cancelCtx{Context: parent}
}

可以看到,只有在使用WithCancel创建context的时候,返回的cancelFunc会传入true。这样当调用cancelFunc 时,会将这个 可取消的context从它的父节点里移除,因为父节点可能有很多子节点,取消之后要和父节点断绝关系,对其他没影响。而对于该context的所有子节点都会因为该节点的cancelFunc调用c.children = nil而化为灰烬,没有必要再一个一个移除。

enter image description here

如上左图,代表一棵 context 树,当然也可以看做是一个协程树。当调用左图子context的cancel 方法后,该 context会依次调用children中的cancel的方法,此时子context不会移除自己;该context将自己从它的父 context中去除掉了:从children中delete,实线箭头变成了虚线。且虚线圈框出来的 context 都被取消了,圈内的 context间的父子关系都被取消掉了。
再重点看propagateCancel():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
func propagateCancel(parent Context, child canceler) {
// 父节点是个空节点
if parent.Done() == nil {
return // parent is never canceled
}
// 找到可以取消的父 context
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
if p.err != nil {
// 父节点已经被取消了,本节点(子节点)也要取消
child.cancel(false, p.err)
} else {
// 父节点未取消
if p.children == nil {
p.children = make(map[canceler]struct{})
}
// "挂到"父节点上
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
// 如果没有找到可取消的父 context。新启动一个协程监控父节点或子节点取消信号
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

上面这段代码的功能就是向上寻找可以“依靠”的“可取消”的context,并且“挂靠”上去。这样,调用上层cancel方法的时候,就可以层层传递,将那些挂靠的子context同时“取消”。
这里也有一个比较巧妙的设计,就是else的情况。起初我一直不理解,怎么可能会有else的情况发生,parent.Done()不为空,怎么会找不到可取消的父节点。这里要翻看parentCancelCtx的源码了。。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
for {
switch c := parent.(type) {
case *cancelCtx:
return c, true
case *timerCtx:
return &c.cancelCtx, true
case *valueCtx:
parent = c.Context
default:
return nil, false
}
}
}

这里只会找三种Context类型:cancelCtx,timerCtx,*valueCtx。若是把Context内嵌到自定义类型里,就识别不出来了,就会走default了。为避免这种情况父节点取消传递不到当前节点之后的节点,重新启动了一个协程来传递这种情况,所以使用可取消的context的时候,尽量避免将ctx塞入自定义的结构里,不然会多一个协程来处理。
另一个巧妙的地方就是select

1
2
3
4
5
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}

同时监听了父节点是否退出,也监听当前节点是否退出。这二者缺一不可。

第一个case 好理解,上层几点取消要继续传递下去,就监听了上层是否被取消。

第二个case 如果子节点自己退出了,那就不需要这个协程了,他的作用就是为了连接上层节点与自己。但如果去掉这个case,上层协程一直不取消,那这个goroutine就泄漏了。

总结

context主要用于处理父子协程之间的同步取消信号,本质上是一种协程同步调用协调的方式。
在使用context的时候,有两点要注意:

  • 父协程只是使用context通知所有的子协程,我已经不需要你了,但不会直接干涉和中断下游任务的执行,具体的操作是由子协程决定操作,因此子协程要使用select来监听ctx.Done。
  • context是线程安全的,可以放心地在多个协程中传递使用。

Leetcode 两个正序数组的中位数

leetcode 两个正序数组的中位数

题目

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
给定两个大小为 m 和 n 的正序(从小到大)数组 nums1 和 nums2。

请你找出这两个正序数组的中位数,并且要求算法的时间复杂度为 O(log(m + n))。

你可以假设 nums1 和 nums2 不会同时为空。

 

示例 1:

nums1 = [1, 3]
nums2 = [2]

则中位数是 2.0
示例 2:

nums1 = [1, 2]
nums2 = [3, 4]

则中位数是 (2 + 3)/2 = 2.5

正常思路

这道题,感觉在于理解什么是中位数

基本思路就是把两个数组合并,但是只需要合并到 总数/2 就可以了 后面可以忽略

代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class Solution(object):
def findMedianSortedArrays(self, nums1, nums2):
"""
:type nums1: List[int]
:type nums2: List[int]
:rtype: float
"""
if len(nums1) < len(nums2):
nums1, nums2 = nums2, nums1
index1 = 0
index2 = 0
total = len(nums1) + len(nums2)
while index2 < len(nums2) and index1 <= total / 2: # 结束条件
if index1 < len(nums1): # 两数组等长,数组1最大的数小雨数组2最小的数,会导致index1==len(nums1) index1 out of range
if nums1[index1] > nums2[index2]:
nums1.insert(index1, nums2[index2])
index2 += 1
else:
index1 += 1
else:
nums1.append(nums2[index2]) # 只要把数组2append到最后
index2 += 1

if total % 2 == 0:
return (nums1[int(total / 2) - 1] + nums1[int(total / 2)]) / 2.0
else:
return nums1[total / 2]

新的思路

我们这个算法时间复杂度是 O(m+n) ,题目有个要求 算法时间复杂度 O(log(m + n)),倘若没这个要求 上面的算法就可以完成了,但是有复杂度 并且是log的,这个时候我们可以想起是二分查找。

先来理解一下, 两个有序数组,中位数 可以理解寻找两个数据的第n/2+1大的数(奇数个数)或者第n/2与n/2+1个大的数的平均值。

我们思考,既然是二分查找,思路应该是第一次二分排除掉一部分 然后继续找。

假设 两个数组A与B, 要找第k大的数, 我们可以比较 A[k/2-1]与B[k/2-1] ,为啥?

因为 A与B数组前面各有 k/2-2个数 ,比较他俩 有几种情况:

  • A[k/2-1]>B[k/2-1]

这说明A[k/2-1] 前面那个数(也就是A[k/2-2]) 要么等于A[k/2-1]要么大于A[k/2-1] (因为A有序) ,再来看A[k/2-2]与B数组的关系, 他与B[k/2-1] 的关系,也就是如果合并,A[k/2-2]一定是在B[k/2-1]这个数后面,也就是说A[k/2-2]前面有 B[k/2-1]这个数前面的所有数 有k/2-1个+A数组前面 k/2-2 = k-3个数,这第k个数 一定是在 B[k/2] ~ B[n] 与 A[0] ~ A[m]之间 这个时候可以把 B[k/2] ~ B[n] 看称一个新的数组, A[0] ~ A[m]看成另一个数组,这个时候k已经变成了k-(k/2-1)了, 也就是新的两个数组的k-(k/2-1)大的数, 第继续比较 两个数组的 k/2-1个数,如此循环。

  • A[k/2-1]<B[k/2-1]

这个时候跟上面的情况相反,只需要间隔A折半 A[k/2] ~A[m] 与 B[0] ~ B[n]

  • A[k/2-1]==B[k/2-1]

相等的情况, 前面有k-4个数 他俩一个是 k-3 一个是k-2 没找到第k个数 还是要找,肯定是在A[k/2] ~ A[m] 与 B[k/2] ~ B[n] 这里可以按照跟情况一或者情况二处理都一样的,都是往后移动一个。

这就是二分法查找,每次排除数组一半的数据.

举个例子两个数组:

长度 13/2+1 =7 要找到第7大的数就可以

1
2
A: 1 3 4 9
B: 1 2 3 4 5 6 7 8 9

第一次 比较 A[7/2-1] ? B[7/2-1]

1
2
A: 1 3 <4> 9
B: 1 2 <3> 4 5 6 7 8 9

之后数组变为以下结果,变为查找第 7 - 3 = 4大的数

1
2
A: 1 3 4 9
B: 4 5 6 7 8 9

重新比较 A[4/2-1] ? B[4/2-1]

1
2
A: 1 <3> 4 9
B: 4 <5> 6 7 8 9

出现相等了,只要改一个就可以, 查找 4 - 2 = 2大的数

1
2
A: 4 9
B: 4 5 6 7 8 9

比较 A[2/2-1] ? B[2/2-1]

1
2
A: <4> 9
B: <4> 5 6 7 8 9

相等的场景,可以任意一个,这里就选择移动A数组了, 2 - 1 = 1 大的数

1
2
A: <9>
B: <4> 5 6 7 8 9

这里已经是第1大的数了 就是最小的那个了,直接比较A[1/2-1] ? B[1/2-1] ===> A[0] ? B[0]

这里找到了 B[0]即 4

假如这个这里找的不是第7大,而是第八大,这里要继续找继续下一组数组就是

1
2
A: 9
B: 5 6 7 8 9

找到了5.

代码实现

这种场景很容易想到 用递归的方式来处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func FindK(a []int, b[]int, k int) int {

// 一个数组已经空了,直接找另一个数组的第k大的数 下标是k-1
if len(a) == 0 {
return b[k-1]
}
if len(b) == 0 {
return a[k-1]
}
// 找到了
if k == 1 {
return min(a[0],b[0])
}
// 比较两个数组的中位数
index := k / 2 - 1
if index < 0 {
index = 0
}
// index不能超过数组长度
newIndexA := min(index, len(a)-1)
newIndexB := min(index, len(b)-1)

if a[newIndexA] > b[newIndexB] { // 比较并移动新的数组
newIndexB += 1
return FindK(a, b[newIndexB:], k-newIndexB)
} else {
newIndexA += 1
return FindK(a[newIndexA:], b, k-newIndexA)
}
}


// 直接调用查找
func findMedianSortedArrays(nums1 []int, nums2 []int) float64 {
total := len(nums1) + len(nums2)
if total % 2 > 0 {
r := FindK(nums1, nums2, total/2+1) // 奇数 刚好是中间的数字
return float64(r)
}
index := total / 2 // 偶数
a := FindK(nums1, nums2, index)
b := FindK(nums1, nums2, index+1)
return float64(a + b) / 2
}

再来看下非递归的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func FindK(a []int, b[]int, k int) int {
indexA, indexB := 0,0
for {
if indexA >= len(a) || len(a) == 0 {
return b[indexB+k-1]
}
if indexB >= len(b) || len(b) == 0 {
return a[indexA+k-1]
}
if k == 1 {
return min(a[indexA], b[indexB])
}
newIndexB := min(indexB+k/2-1, len(b)-1)
newIndexA := min(indexA+k/2-1, len(a)-1)
if a[newIndexA] > b[newIndexB] {
k = k - (newIndexB-indexB+1)
indexB = newIndexB + 1
} else {
k = k - (newIndexA - indexA+1)
indexA = newIndexA + 1
}
}
}

终于,花了两个晚上 弄懂了以上一个非常简单的算法,深感智商有限,你的努力在天赋面前不值一提,你努力达到的高度,可能是别人的起点。

平凡人的努力,终究是徒劳。

redis 内存淘汰策略

redis 内存淘汰策略

将 Redis 用作缓存时, 如果内存空间用满, 就会根据配置的最大内存策略返回错误或者自动驱逐老的数据。

maxmemory 配置指令

maxmemory 用于指定 Redis 能使用的最大内存。既可以在 redis.conf 文件中设置, 也可以在运行过程中通过 CONFIG SET 命令动态修改。

例如, 要设置 100MB 的内存限制, 可以在 redis.conf 文件中这样配置

1
maxmemory 100mb

maxmemory 设置为 0, 则表示不进行内存限制。当然, 对32位系统来说有一个隐性的限制条件: 最多 3GB 内存.

内存淘汰策略(maxmemory-policy)
  • noeviction

不删除,达到最大内存,如果继续写(新增)数据,直接返回错误信息。DEL还是可以正常执行。

  • allkeys-lru

所有key范围,优先删除最近最少使用(LRU)

  • volatile-lru

只限于设置了expire的部分,优先删除最近最少使用(LRU)的key

  • allkeys-random

所有key范围,随机删除一部分key

  • volatile-random

只限于设置了expire的部分key,随机删除

  • volatile-ttl

只限于设置了expire的部门;优先删除剩余时间短的key

如果没有设置 expire 的key, 不满足先决条件(prerequisites); 那么 volatile-lru, volatile-randomvolatile-ttl 策略的行为, 和 noeviction(不删除) 基本上一致。

内存淘汰的实现

淘汰过程:

  • 客户端执行命令,导致redis中的数据增加,内存增加
  • Redis检查内存使用量,超过maxmemory限制,根据策略清除key

近似LRU算法

为了节约内存,采用了抽取少量的key样本,然后删除其中访问时间最久的key。

在 Redis 的 LRU 算法中, 可以通过设置样本(sample)的数量来调优算法精度。

1
maxmemory-samples <count>

Letcode 两数相加

leetcode 两数相加

题目
1
2
3
4
5
6
7
8
9
10
11
给出两个 非空 的链表用来表示两个非负的整数。其中,它们各自的位数是按照 逆序 的方式存储的,并且它们的每个节点只能存储 一位 数字。

如果,我们将这两个数相加起来,则会返回一个新的链表来表示它们的和。

您可以假设除了数字 0 之外,这两个数都不会以 0 开头。

示例:

输入:(2 -> 4 -> 3) + (5 -> 6 -> 4)
输出:7 -> 0 -> 8
原因:342 + 465 = 807
  1. 这道题比较简单,主要是考链表的,下面是自己写的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
# Definition for singly-linked list.
# class ListNode(object):
# def __init__(self, x):
# self.val = x
# self.next = None

class Solution(object):
def addTwoNumbers(self, l1, l2):
"""
:type l1: ListNode
:type l2: ListNode
:rtype: ListNode
"""
l3 = None
pre1 = l1
pre2 = l2
pre = l3
jin = 0 # 进位
while pre1 and pre2:
s = pre1.val + pre2.val + jin
if s >= 10:
jin = int(s / 10)
s = s % 10
else:
jin = 0
cur = ListNode(s)
if pre:
pre.next = cur
pre = pre.next
else:
l3 = cur
pre = cur
pre1 = pre1.next
pre2 = pre2.next

left = pre1 if pre1 else pre2
while left:
s = left.val + jin
if s >= 10:
jin = int(s / 10)
s = s % 10
else:
jin = 0
cur = ListNode(s)
pre.next = cur
pre = pre.next
left = left.next
if jin != 0:
cur = ListNode(jin)
pre.next = cur
return l3
  1. 下面是参考别人写的,简洁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# Definition for singly-linked list.
# class ListNode(object):
# def __init__(self, x):
# self.val = x
# self.next = None

class Solution(object):
def addTwoNumbers(self, l1, l2):
"""
:type l1: ListNode
:type l2: ListNode
:rtype: ListNode
"""
ret = ListNode(0)
cur = ret
carry = 0
while l1 or l2:
x = 0 if l1 is None else l1.val
y = 0 if l2 is None else l2.val
s = x + y + carry
carry = int(s / 10)
cur.next = ListNode(s % 10)
cur = cur.next
if l1:
l1 = l1.next
if l2:
l2 = l2.next
if carry != 0:
cur.next = ListNode(carry)
return ret.next

Letcode 两数之和

Letcode 两数之和

题目
1
2
3
4
5
6
7
8
9
10
给定一个整数数组 nums 和一个目标值 target,请你在该数组中找出和为目标值的那 两个 整数,并返回他们的数组下标。

你可以假设每种输入只会对应一个答案。但是,数组中同一个元素不能使用两遍。

示例:

给定 nums = [2, 7, 11, 15], target = 9

因为 nums[0] + nums[1] = 2 + 7 = 9
所以返回 [0, 1]
  1. 最简单的暴力解法

最容易想到的,但是内存耗时都是比较高的,这是一般人的解法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class Solution(object):
def twoSum(self, nums, target):
"""
:type nums: List[int]
:type target: int
:rtype: List[int]
"""
index = 0
ret = []
ok = False
for index in range(len(nums)):
other = target - nums[index]
start = index
while start < len(nums) - 1:
if nums[start + 1] == other:
ret.append(index)
ret.append(start + 1)
ok = True
break
start += 1
if ok:
break
return ret
  1. 简便解法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Solution(object):
def twoSum(self, nums, target):
"""
:type nums: List[int]
:type target: int
:rtype: List[int]
"""
num_map = {}
index = 0
for num in nums:
other = target - num
if num_map.get(other) is not None: # 注意0
return [num_map.get(other), index]
num_map[num] = index
index += 1
return [0, 0]

一次循环,把遍历过的每个数的差与当前数字的索引放入字典,后面如果有数字存在在字典中,则该index与字典中保存的index为结果

我真的是被自己蠢哭,花了一个小时,弄懂了两个算法;

深感有心无力,年龄大了,老程序员脑子不灵光了。