雷池社区版通信机制优化
标签搜索
侧边栏壁纸
  • 累计撰写 7 篇文章
  • 累计收到 1 条评论

雷池社区版通信机制优化

lxp
lxp
2024-07-30 / 0 评论 / 8 阅读 / 正在检测是否收录...

雷池社区版组件结构

组件名称释义
MgtWeb 管理服务
Mario日志服务
TengineNginx 配置服务
Postgres数据库存储服务
FVM-Managerfvm 管理服务

问题背景

  • 在某一次社区版发布了获取防护网站的 title 和icon 信息功能。结果一部分的社区用户反馈在管理页面编辑站点页面显示 「Service is abnormal ...」,排查发现是 tengine 服务发生了异常。

业务逻辑

  • 由于需要获取上游服务器的信息,需要容器的网络为「host」模式,在社区版的容器中只有 Tengine 容器是符合要求的,所以发生需要获取防护网站的信息时,MGT 容器会创建一个事件,通过 GRPC Stream Client 发送给 Tengine 的 GRPC Stream Server。Tengine 则是单线程不断通过 Stream 接收事件后开始处理,完成后将结果通过 GRPC Stream 发送给 MGT 容器(Client)。

排查思路

  • 根据错误日志,最后将问题范围约束在 GRPC Stream 通信这里。Tengine 通过不断循环 GRPC Stream 里面的消息处理然后再返回,这个逻辑在原有的设计上并不会发生问题,社区用户并不会频繁的修改站点相关的配置,每种事件的处理时间都控制在合理的时间内正常返回。但是加入了获取防护网站信息的事件后发生了异常,原因是 Tengine 并不一定能够成功的连通防护网站,这导致了 Tengine 在处理此轮事件花费太长的时间。在这个过程中,如果用户操作了站点相关的配置,会有一个新的线程监听stream 返回的消息,这导致了多个线程发送事件时,接受到了其他事件的结果,造成整体的通讯混乱。不过这个问题看起来并不是导致 Mgt 抛出「Service is abnormal...」的原因。
  • 分析 MGT 端的代码,发生异常时 GRPC stream server 的 Context 已经被结束了。根据代码逻辑将 GRPC stream client 置为了 Nil,后续再有线程发出事件时直接返回「Service is abnormal...」。再次分析 Tengine 的 GRPC stream server发现:当某次事件处理发生异常时返回一个 Error 信息给 client,随后关闭了 GRPC stream。等待 5 秒后再次重启 GRPC stream server。但 client 并不会尝试重连,最终造成的结果就是 MGT 接下来所有和 Tengine 相关的操作都会发生异常。至此问题的原因比较清晰了。

通信机制思考

  • 重新分析了 MGT 和 Tengine 的通信需求,最终决定采用 http 通信的方式(httpRPC)来替代 GRPC 通信。这样做主要出于以下几点考虑:

    • MGT 和 Tengine 的通信并没有很高实时性的需求
    • MGT 和 Tengine 之间并没有需要用到GRPC流式处理的特性
    • 基于本次的问题分析,发生问题后不易找到问题原因和调试
  • 我们希望新的通信机制能够:

    • 出现问题易分析
    • 某次事件发生错误不会影响其他无关事件处理
    • 支持同步任务和异步任务

通信机制优化

Tengine 侧 httpRPC server 设计

  • server 端的 httprpc 将使用单线程模型,避免使用锁机制降低代码的复杂度,串行地处理所有 MGT 的事件。Tengine 启动完成后将不断和 MGT 通过 ECDH 协商共享密钥,只有协商成功后才能进行后续的 httprpc通信。

    密钥协商

  • ECDH(Elliptic Curve Diffie-Hellman)是一种密钥协商协议,用于在两个通信方之间安全地协商一个共享的密钥。这个共享密钥可以用于对称加密算法,以便进行安全通信。 ^618810

    • 密钥生成:每个通信方都生成一对椭圆曲线上的公钥和私钥。公钥用于与对方进行协商,私钥用于生成共享密钥。 ^7d59a1
    • 公钥交换:双方交换各自的公钥。这可以通过不同的方式实现,例如通过网络传输、安全信道或其他安全手段。
    • 密钥协商:双方使用对方的公钥和自己的私钥计算一个共享的密钥。这个密钥是基于椭圆曲线数学问题的难解性而计算出来的,因此即使在公开的情况下,计算出私钥也是困难的。
    • 共享密钥:双方现在都拥有相同的共享密钥,可以用于对称加密算法(如AES)进行加密和解密。
  • server 端启动后将会定期拉取 client 积攒的事件,如果成功拉倒消息,将缩短这个时间间隔更加频繁的拉取事件。拉取到事件后通过上一步协商出的共享密钥进行解密,获得事件 ID,事件类型,事件数据,时间超时时间。根据事件的类型执行对应事件的逻辑函数:比如生成站点的 Nginx 配置、申请/续签 ACME 证书...
  • 事件处理的结果将包装成一个 TaskResult,用协商好的共享密钥加密,加密后的数据发送给 client。同时监听 client 确认收到消息的响应,正确的响应 server 将清理掉这个事件相关的信息,如果超时或 client 不在线, server 将在一段时间内会保留该事件信息,同时不断和 client 发起密钥协商请求,直到新的密钥协商完成后。

MGT 侧 httpRPC client设计

  • 新的 httpRPC 仍然使用单线程模型处理 MGT 发送的所有事件同时为每个消息赋予一个事件 ID,便于后续对事件的追踪和 MGT 接收后发送给对应的线程继续处理。设置每个任务的超时时间,任务超时后,线程不再等待返回一个超时的 Error。最终对 Task的设计:

    type Task struct {
      Id      string          //事件 ID
      Cmd     string          //事件类型
      Params  *map[string]any //事件数据
      Timeout *time.Duration  //任务超时时间
    }
  • httpRPC 将会发送 Task 给 Server端进行处理。那么 MGT 的发送事件的线程也需要等待 Task的结果。所以这里还需要设计一个 TaskResult。这个 TaskResult 需要包含事件的处理结果和状态

    type TaskResult struct {
      Status status          //状态
      Data   []byte          //处理结果
      Error  string          //异常
    }
  • 接下来需要设计 httpRPC 的执行逻辑。为了避免多线程带来的冲突,MGT 在启动时将初始化一个 runner。创建一个事件 channel 用于接收 MGT 将要发送的事件。httpRPC 在 MGT 需要有三种行为:将要发送的事件插入到事件channel、接受到 Tengine 的事件拉取请求后弹出事件以及接收 Tengine返回的结果解密后发送到对应的线程。两种队列:同步事件队列,异步事件队列,同时还需要具备清理失效任务的能力。

    • 对于新产生的事件。包装成 Task 后httprpc需要监测这个 Task 是否超时,同步/异步,以及接收结果的 channel,httpRPC 将会把 Server 的处理结果通过这个 channel 发送给生产事件的线程。
    • 对于完成的事件,httpRPC 将会执行 complete 操作,将这个任务清理掉。
    • 对于待消费的事件,httpRPC 将等待 Server 的拉取事件操作,弹出事件并加密返回给Server。

      // pushTask model
      
      type httpRpc struct {
      syncQueue  []*queuedTask
      asyncQueue []*queuedTask
      }
      
      type pushTask struct {
      task   Task
      sync   bool
      expire time.Time
      ch     chan TaskResult
      }
      type completeTask struct {
      Id    string
      Data  []byte
      Error string
      }
      
      type popTask struct {
      ch chan *Task
      }
      
      
      for {
      m := <-msgCh
      if m.push != nil {
          rpc.add(*m.push)
      }
      if m.pop != nil {
          rpc.pop(*m.pop)
      }
      if m.result != nil {
          rpc.complete(*m.result)
      }
      rpc.revoke()
      }
  • MGT 中大部分的事件都是同步任务,例如用户修改了站点配置场景中,线程生产事件并等待返回的结果。这部分将封装成一个 SyncCall 函数。线程将包装好的事件通过 SyncCall 函数插入到事件队列中。接下来 httpRPC 将等待 server 端的拉取事件请求。完成后 httpRPC 将 server 端的处理结果通过 SyncCall 的 rstCh发送给生产事件的线程。如果处理的时间太久,将不会再等待这个结果,返回一个超时的 Error,避免影响后面的事件。

    
    func SyncCall(t Task) TaskResult {
      rstCh := make(chan TaskResult, 1)
    
      age := syncTimeout
      if t.Timeout != nil {
          age = *t.Timeout
      }
    
      msgCh <- message{
          push: &pushTask{
              task:   t,
              sync:   true,
              ch:     rstCh,
              expire: time.Now().Add(age),
          },
      }
    
      select {
      case r, ok := <-rstCh:
          if ok {
              return r
          }
          return TaskResult{
              Status: StatusTimeout,
              Error:  "client timeout, channel closed",
          }
      case <-time.After(age):
          return TaskResult{
              Status: StatusTimeout,
              Error:  "client timeout",
          }
      }
    }
  • 对于 MGT 的异步事件,同样需要设计一个 ASyncCall 函数来负责处理异步事件。这个函数将MGT 生产的事件插入到异步事件队列中。在社区版一个具体的场景:用户编辑站点后将会生产获取防护网站的信息事件,这个事件并没有很高的实时性要求,通过 ASyncCall 函数可以让线程立刻返回。server 处理完这个事件后将会回传防护网站信息给 Client。httpRPC 将根据 server 回传的时间处理结果及事件类型找到 MGT 中注册对此类事件的处理函数并执行。
  • 至此完成了 client 侧 httpRPC 的大部分工作,最后 httpRPC 需要一个函数来接收 server 端的事件处理结果。对于同步任务,httpRPC 直接通过 SyncCall 函数的响应接收 channel 发送给事件生产的线程。对于异步任务,因为事件生产线程已经结束,需要调用对应事件类型注册的事件结果处理函数。

    func register(msgCmd string, callBack func(data []byte) (interface{}, error)) {
      routerFunc[msgCmd] = callBack
    }
    
    func ReceiveMsg(content []byte) {
      var params ResultMessage
      err := json.Unmarshal(content, &params)
      if err != nil {
          logger.Error(err)
          return
      }
      logger.Infof("TCD response:%v", params)
      httprpc.Complete(params.ID, []byte(params.Data), params.Error)
      updatedAt = time.Now()
      if params.Error != "" {
          logger.Error(params.Error)
          return
      }
      if _, ok := routerFunc[params.Cmd]; ok {
          _, err := routerFunc[params.Cmd]([]byte(params.Data))
          if err != nil {
              logger.Error(err)
              return
          }
      }
    }

    总结

  • 至此完成了一次 GRPC STREAM 问题排查,以及使用新设计的 httprpc 替换 GRPC,新的 httpRPC 在社区版的环境下更加简单易用,简单意味着健壮、可控。
0

评论

博主关闭了当前页面的评论