首页
关于我
Search
1
魔改comfy2go支持Https、wss以及Basic Auth
17 阅读
2
Navidrome 自建家庭音乐服务
11 阅读
3
Go实现密钥协商
9 阅读
4
雷池社区版通信机制优化
8 阅读
5
CF反代Docker镜像源,国内拉取Docker镜像
8 阅读
默认分类
Go
safeline
cloudflare
docker
HomeLab
登录
Search
标签搜索
safeline
密钥协商
typecho
comfyUI
go
闻道
累计撰写
7
篇文章
累计收到
1
条评论
首页
栏目
默认分类
Go
safeline
cloudflare
docker
HomeLab
页面
关于我
搜索到
1
篇与
的结果
2024-07-30
雷池社区版通信机制优化
雷池社区版组件结构组件名称释义MgtWeb 管理服务Mario日志服务TengineNginx 配置服务Postgres数据库存储服务FVM-Managerfvm 管理服务问题背景在某一次社区版发布了获取防护网站的 title 和icon 信息功能。结果一部分的社区用户反馈在管理页面编辑站点页面显示 「Service is abnormal ...」,排查发现是 tengine 服务发生了异常。业务逻辑由于需要获取上游服务器的信息,需要容器的网络为「host」模式,在社区版的容器中只有 Tengine 容器是符合要求的,所以发生需要获取防护网站的信息时,MGT 容器会创建一个事件,通过 GRPC Stream Client 发送给 Tengine 的 GRPC Stream Server。Tengine 则是单线程不断通过 Stream 接收事件后开始处理,完成后将结果通过 GRPC Stream 发送给 MGT 容器(Client)。排查思路根据错误日志,最后将问题范围约束在 GRPC Stream 通信这里。Tengine 通过不断循环 GRPC Stream 里面的消息处理然后再返回,这个逻辑在原有的设计上并不会发生问题,社区用户并不会频繁的修改站点相关的配置,每种事件的处理时间都控制在合理的时间内正常返回。但是加入了获取防护网站信息的事件后发生了异常,原因是 Tengine 并不一定能够成功的连通防护网站,这导致了 Tengine 在处理此轮事件花费太长的时间。在这个过程中,如果用户操作了站点相关的配置,会有一个新的线程监听stream 返回的消息,这导致了多个线程发送事件时,接受到了其他事件的结果,造成整体的通讯混乱。不过这个问题看起来并不是导致 Mgt 抛出「Service is abnormal...」的原因。分析 MGT 端的代码,发生异常时 GRPC stream server 的 Context 已经被结束了。根据代码逻辑将 GRPC stream client 置为了 Nil,后续再有线程发出事件时直接返回「Service is abnormal...」。再次分析 Tengine 的 GRPC stream server发现:当某次事件处理发生异常时返回一个 Error 信息给 client,随后关闭了 GRPC stream。等待 5 秒后再次重启 GRPC stream server。但 client 并不会尝试重连,最终造成的结果就是 MGT 接下来所有和 Tengine 相关的操作都会发生异常。至此问题的原因比较清晰了。通信机制思考重新分析了 MGT 和 Tengine 的通信需求,最终决定采用 http 通信的方式(httpRPC)来替代 GRPC 通信。这样做主要出于以下几点考虑:MGT 和 Tengine 的通信并没有很高实时性的需求MGT 和 Tengine 之间并没有需要用到GRPC流式处理的特性基于本次的问题分析,发生问题后不易找到问题原因和调试我们希望新的通信机制能够:出现问题易分析某次事件发生错误不会影响其他无关事件处理支持同步任务和异步任务通信机制优化Tengine 侧 httpRPC server 设计server 端的 httprpc 将使用单线程模型,避免使用锁机制降低代码的复杂度,串行地处理所有 MGT 的事件。Tengine 启动完成后将不断和 MGT 通过 ECDH 协商共享密钥,只有协商成功后才能进行后续的 httprpc通信。密钥协商ECDH(Elliptic Curve Diffie-Hellman)是一种密钥协商协议,用于在两个通信方之间安全地协商一个共享的密钥。这个共享密钥可以用于对称加密算法,以便进行安全通信。 ^618810密钥生成:每个通信方都生成一对椭圆曲线上的公钥和私钥。公钥用于与对方进行协商,私钥用于生成共享密钥。 ^7d59a1公钥交换:双方交换各自的公钥。这可以通过不同的方式实现,例如通过网络传输、安全信道或其他安全手段。密钥协商:双方使用对方的公钥和自己的私钥计算一个共享的密钥。这个密钥是基于椭圆曲线数学问题的难解性而计算出来的,因此即使在公开的情况下,计算出私钥也是困难的。共享密钥:双方现在都拥有相同的共享密钥,可以用于对称加密算法(如AES)进行加密和解密。server 端启动后将会定期拉取 client 积攒的事件,如果成功拉倒消息,将缩短这个时间间隔更加频繁的拉取事件。拉取到事件后通过上一步协商出的共享密钥进行解密,获得事件 ID,事件类型,事件数据,时间超时时间。根据事件的类型执行对应事件的逻辑函数:比如生成站点的 Nginx 配置、申请/续签 ACME 证书...事件处理的结果将包装成一个 TaskResult,用协商好的共享密钥加密,加密后的数据发送给 client。同时监听 client 确认收到消息的响应,正确的响应 server 将清理掉这个事件相关的信息,如果超时或 client 不在线, server 将在一段时间内会保留该事件信息,同时不断和 client 发起密钥协商请求,直到新的密钥协商完成后。MGT 侧 httpRPC client设计新的 httpRPC 仍然使用单线程模型处理 MGT 发送的所有事件同时为每个消息赋予一个事件 ID,便于后续对事件的追踪和 MGT 接收后发送给对应的线程继续处理。设置每个任务的超时时间,任务超时后,线程不再等待返回一个超时的 Error。最终对 Task的设计:type Task struct { Id string //事件 ID Cmd string //事件类型 Params *map[string]any //事件数据 Timeout *time.Duration //任务超时时间 }httpRPC 将会发送 Task 给 Server端进行处理。那么 MGT 的发送事件的线程也需要等待 Task的结果。所以这里还需要设计一个 TaskResult。这个 TaskResult 需要包含事件的处理结果和状态type TaskResult struct { Status status //状态 Data []byte //处理结果 Error string //异常 }接下来需要设计 httpRPC 的执行逻辑。为了避免多线程带来的冲突,MGT 在启动时将初始化一个 runner。创建一个事件 channel 用于接收 MGT 将要发送的事件。httpRPC 在 MGT 需要有三种行为:将要发送的事件插入到事件channel、接受到 Tengine 的事件拉取请求后弹出事件以及接收 Tengine返回的结果解密后发送到对应的线程。两种队列:同步事件队列,异步事件队列,同时还需要具备清理失效任务的能力。对于新产生的事件。包装成 Task 后httprpc需要监测这个 Task 是否超时,同步/异步,以及接收结果的 channel,httpRPC 将会把 Server 的处理结果通过这个 channel 发送给生产事件的线程。对于完成的事件,httpRPC 将会执行 complete 操作,将这个任务清理掉。对于待消费的事件,httpRPC 将等待 Server 的拉取事件操作,弹出事件并加密返回给Server。// pushTask model type httpRpc struct { syncQueue []*queuedTask asyncQueue []*queuedTask } type pushTask struct { task Task sync bool expire time.Time ch chan TaskResult } type completeTask struct { Id string Data []byte Error string } type popTask struct { ch chan *Task } for { m := <-msgCh if m.push != nil { rpc.add(*m.push) } if m.pop != nil { rpc.pop(*m.pop) } if m.result != nil { rpc.complete(*m.result) } rpc.revoke() }MGT 中大部分的事件都是同步任务,例如用户修改了站点配置场景中,线程生产事件并等待返回的结果。这部分将封装成一个 SyncCall 函数。线程将包装好的事件通过 SyncCall 函数插入到事件队列中。接下来 httpRPC 将等待 server 端的拉取事件请求。完成后 httpRPC 将 server 端的处理结果通过 SyncCall 的 rstCh发送给生产事件的线程。如果处理的时间太久,将不会再等待这个结果,返回一个超时的 Error,避免影响后面的事件。 func SyncCall(t Task) TaskResult { rstCh := make(chan TaskResult, 1) age := syncTimeout if t.Timeout != nil { age = *t.Timeout } msgCh <- message{ push: &pushTask{ task: t, sync: true, ch: rstCh, expire: time.Now().Add(age), }, } select { case r, ok := <-rstCh: if ok { return r } return TaskResult{ Status: StatusTimeout, Error: "client timeout, channel closed", } case <-time.After(age): return TaskResult{ Status: StatusTimeout, Error: "client timeout", } } }对于 MGT 的异步事件,同样需要设计一个 ASyncCall 函数来负责处理异步事件。这个函数将MGT 生产的事件插入到异步事件队列中。在社区版一个具体的场景:用户编辑站点后将会生产获取防护网站的信息事件,这个事件并没有很高的实时性要求,通过 ASyncCall 函数可以让线程立刻返回。server 处理完这个事件后将会回传防护网站信息给 Client。httpRPC 将根据 server 回传的时间处理结果及事件类型找到 MGT 中注册对此类事件的处理函数并执行。至此完成了 client 侧 httpRPC 的大部分工作,最后 httpRPC 需要一个函数来接收 server 端的事件处理结果。对于同步任务,httpRPC 直接通过 SyncCall 函数的响应接收 channel 发送给事件生产的线程。对于异步任务,因为事件生产线程已经结束,需要调用对应事件类型注册的事件结果处理函数。func register(msgCmd string, callBack func(data []byte) (interface{}, error)) { routerFunc[msgCmd] = callBack } func ReceiveMsg(content []byte) { var params ResultMessage err := json.Unmarshal(content, ¶ms) if err != nil { logger.Error(err) return } logger.Infof("TCD response:%v", params) httprpc.Complete(params.ID, []byte(params.Data), params.Error) updatedAt = time.Now() if params.Error != "" { logger.Error(params.Error) return } if _, ok := routerFunc[params.Cmd]; ok { _, err := routerFunc[params.Cmd]([]byte(params.Data)) if err != nil { logger.Error(err) return } } }总结至此完成了一次 GRPC STREAM 问题排查,以及使用新设计的 httprpc 替换 GRPC,新的 httpRPC 在社区版的环境下更加简单易用,简单意味着健壮、可控。
2024年07月30日
8 阅读
0 评论
0 点赞