0%

缓存的使用

从系统层面看,使用缓存的目的无外乎缓解 DB 压力(主要是读压力),提升服务响应速度。
引入缓存,就不可避免地引入了缓存与业务 DB 数据的一致性问题,而不同的业务场景,对数据一致性的要求也不同。
因为redis和db毕竟是两套系统,数据的一致性想要达到何种程度要根据业务场景来取舍:

  • 最终一致性分布式缓存场景
    对于业务场景对数据一致性要求不是那么高的情况下,我们可以通过队列,binlog等手段达到最终一致性的效果。
  • 强一致性分布式缓存场景
    数据库跟缓存,以Mysql跟Redis举例,毕竟是两套系统,如果要保证强一致性,势必要引入2PC或Paxos等分布式一致性协议,或者是分布式锁等等,这个在实现上是有难度的,而且一定会对性能有影响。
    而且如果真的对数据的一致性要求这么高,我们需要考虑 缓存是否真的有必要,直接读写数据库不是更好?以何种模式做到数据库跟缓存的数据强一致性,并且对系统是有提升的。

数据库和缓存的读写顺序

一般我们在操作数据库和缓存的时候,都是先读缓存,缓存没有了,去读数据库,然后写入缓存。大致步骤情况如下。

  • 过期数据:程序先从缓存中读取数据,如果没有命中,则从数据库中读取,成功之后将数据放到缓存中
  • 命中缓存:程序先从缓存中读取数据,如果命中,则直接返回
  • 更新数据:程序先更新数据库,在删除缓存

再这里,不过多讨论,数据更新的其他方式。如
先更新缓存,在更新数据,【数据库可能回滚,这个时候还是要删除缓存】
先更新数据,在更新缓存。【两个线程同时更新的情况,有可能时序上出现错乱,导致不是最新数据】
先删除缓存,在更新数据。【数据未更新完成的情况,会有老数据写入缓存】

然而,我们在执行 更新数据和过期数据重新设置缓存的,在并发情况下会出现时序问题,造成缓存写入的不一定是最新数据。

  • 当缓存失效时,同时有一个读请求和写请求或者读请求在写未完毕的过程中,此时读到的是old数据,并且由于时序原因【网络等各种情况导致的】,导致写入的操作在删除操作之后,会写入老数据。


对于此种情况,如果对数据一致性没那么敏感的情况下,我们可以考虑设置的缓存时间短一些,
在有从库情况下考虑监听binlog下情况,在数据同步后在执行一次删除缓存的操作。
或者用一个key 表明数据处在主从延迟同步的情况,需要从主库读取,如果有数据库代理中间件,在中间内做这个操作是最好的。

阅读全文 »

记fasthttp-workerpool的实现方式:
golang中官方自带的net/http 默认处理请求的方式,对于一个http请求会单独的新起一个协程去做处理。在请求量大的时候,会使goroutine数量巨大,会增加runtime层的上下文切换成本,调度负担。而fasthttp 使用了workerpool规避这些问题。

fastHTTP的请求流程的处理

fasthttp的server启动时候,主要做了

  1. 监听端口
  2. 启动wokerpool
  3. 把真正处理链接的workFun注册到协程池里
  4. 循环接受HTTP链接,然后调用wp.Serve去处理HTTP链接
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    func (s *Server) Serve(ln net.Listener) error {
    var lastOverflowErrorTime time.Time
    var lastPerIPErrorTime time.Time
    var c net.Conn
    var err error

    // 协程池大小
    maxWorkersCount := s.getConcurrency()

    s.mu.Lock()
    {
    s.ln = append(s.ln, ln)
    if s.done == nil {
    s.done = make(chan struct{})
    }

    if s.concurrencyCh == nil {
    s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    }
    }
    s.mu.Unlock()

    wp := &workerPool{
    WorkerFunc: s.serveConn, // 注册真正处理HTTP链接的函数
    MaxWorkersCount: maxWorkersCount,
    LogAllErrors: s.LogAllErrors,
    MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
    Logger: s.logger(),
    connState: s.setState,
    }
    // 启动协程池管理,start函数里会触发一个单独的协程用来清理那些过期的协程
    wp.Start()


    atomic.AddInt32(&s.open, 1)
    defer atomic.AddInt32(&s.open, -1)

    for {
    if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
    wp.Stop()
    if err == io.EOF {
    return nil
    }
    return err
    }
    s.setState(c, StateNew)
    atomic.AddInt32(&s.open, 1)
    //wp.Serve 里会从协程池里获取协程worker,然后去处理HTTP链接
    if !wp.Serve(c) {
    atomic.AddInt32(&s.open, -1)
    s.writeFastError(c, StatusServiceUnavailable,
    "The connection cannot be served because Server.Concurrency limit exceeded")
    c.Close()
    s.setState(c, StateClosed)
    if time.Since(lastOverflowErrorTime) > time.Minute {
    s.logger().Printf("The incoming connection cannot be served, because %d concurrent connections are served. "+
    "Try increasing Server.Concurrency", maxWorkersCount)
    lastOverflowErrorTime = time.Now()
    }
    if s.SleepWhenConcurrencyLimitsExceeded > 0 {
    time.Sleep(s.SleepWhenConcurrencyLimitsExceeded)
    }
    }
    c = nil
    }
    }

    workerpool主要的作用逻辑

    wokerpool主要做了以下这些事情
  5. 管理协程worker,清楚过去的worker
  6. 使用自身的属性read来存储 worker
  7. HTTP链接和worker使用channel来传递
  8. 接受来自server注册的真正处理链接的函数workFun
    自身主要结构如下图

    处理流程如下图

    workerpool主要函数及属性分析

  9. start函数
    主要单独起一个协程处理过期的worker,并且初始化了sync.pool的新增worker的方法
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (wp *workerPool) Start() {
    if wp.stopCh != nil {
    panic("BUG: workerPool already started")
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    // 使用sync.pool 来复用workerChan
    wp.workerChanPool.New = func() interface{} {
    return &workerChan{
    ch: make(chan net.Conn, workerChanCap),
    }
    }
    // 主要单独起一个协程处理过期的worker
    go func() {
    var scratch []*workerChan
    for {
    wp.clean(&scratch)
    select {
    case <-stopCh:
    return
    default:
    time.Sleep(wp.getMaxIdleWorkerDuration())
    }
    }
    }()
    }
  10. clean 主要是清理过期的worker
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    func (wp *workerPool) clean(scratch *[]*workerChan) {
    maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
    criticalTime := time.Now().Add(-maxIdleWorkerDuration)

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready)

    // 使用二分查找,发现过期的worker
    l, r, mid := 0, n-1, 0
    for l <= r {
    mid = (l + r) / 2
    if criticalTime.After(wp.ready[mid].lastUseTime) {
    l = mid + 1
    } else {
    r = mid - 1
    }
    }
    i := r
    if i == -1 {
    wp.lock.Unlock()
    return
    }

    *scratch = append((*scratch)[:0], ready[:i+1]...)
    m := copy(ready, ready[i+1:])
    for i = m; i < n; i++ {
    ready[i] = nil
    }
    wp.ready = ready[:m]
    wp.lock.Unlock()

    tmp := *scratch
    for i := range tmp {
    tmp[i].ch <- nil
    tmp[i] = nil
    }
    }
  11. serve 接受HTTP请求的入口
    从连接池里获取worker,然后让worker去处理,如果是新建的worker会起一个协程。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    func (wp *workerPool) Serve(c net.Conn) bool {
    // getCh会获取协程worker,如果是新建的worker会起一个协程。
    ch := wp.getCh()
    if ch == nil { //说明worker达到最大数目
    return false
    }
    // 这里使用channel,与worker里的协程通信
    ch.ch <- c
    return true
    }
    // 获取worker
    func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
    if wp.workersCount < wp.MaxWorkersCount {
    createWorker = true
    wp.workersCount++
    }
    } else {
    ch = ready[n]
    ready[n] = nil
    wp.ready = ready[:n]
    }
    wp.lock.Unlock()

    if ch == nil { // read取不到,且未达到最大,则新建worker
    if !createWorker {
    return nil
    }
    vch := wp.workerChanPool.Get()
    ch = vch.(*workerChan)
    go func() {
    wp.workerFunc(ch) // 在协程里处理链接,通过channel接受HTTP链接
    wp.workerChanPool.Put(vch)
    }()
    }
    return ch
    }
  12. workerFunc 真正处理链接的地方
    主要是一直for循环等channel里出现新的HTTP链接,然后接受HTTP链接去处理
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
    if c == nil { // 说明,自身长时间未获取到新的HTTP请求导致超时被关闭或者workerpool停止。
    break
    }

    // 使用Server注册的WorkerFunc真正处理HTTP链接
    if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
    errStr := err.Error()
    if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
    strings.Contains(errStr, "reset by peer") ||
    strings.Contains(errStr, "request headers: small read buffer") ||
    strings.Contains(errStr, "unexpected EOF") ||
    strings.Contains(errStr, "i/o timeout") ||
    errors.Is(err, ErrBadTrailer)) {
    wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
    }
    }
    if err == errHijacked {
    wp.connState(c, StateHijacked)
    } else {
    _ = c.Close()
    wp.connState(c, StateClosed)
    }
    c = nil

    // 处理完一个请求后,把自身归返到read里去
    if !wp.release(ch) {
    break
    }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
    }
阅读全文 »

rpc 的一个流程

当一个请求过来的,经过网关,网关调用应用服务,应用服务也需要调其他的服务。这个时候我们要考虑的问题。

  • 服务之间的发现
    我们需要一个发现服务和注册服务的地方,既是注册中心。还需要监控服务的可用性
  • 服务是多机器部署的,怎么均衡服务节点的流量
    负载均衡的实现,保证流量不被打到某一台或者特定的几台节点上,其他节点无流量
  • 调用超时问题
    需要关心网关调用服务,或者服务之间调用的超时问题
  • 服务负载问题
    如果某一个服务集群负载过高怎么办
  • 服务的节点管理等等
    怎么样管理和监控服务节点
  • 服务之间交互的协议
    gateway
阅读全文 »

MySQL的架构图

一条SQL语句从客户端发送到服务端,需要经历很多流程

  • 客户端需要与服务端建立连接,建立连接的时候,服务端会验证客户端的登录验证,登录用户权限等操作
  • 处理SQL语句
    登录成功后,客户端发送SQL语句,需要解析SQL,然后优化SQL语句的执行,判断不同的执行类型,让执行器执行。
    如果是操作数据的操作,还需要从磁盘加载数据到内存做相应的处理。修改类的还需要回写到磁盘,以防数据丢失。
    然后基于不同的SQL语句返回处理结果
  • 客户端基于返回结果集做处理
    mysql

分析一个update的执行流程

mysql-update

阅读全文 »

注解

注解是放在Java源码的类、方法、字段、参数前的一种特殊“注释”。是附加在代码中的一些元信息,用于编译和运行时进行解析和使用,起到说明、配置的功能。注解不会影响代码的实际逻辑,仅仅起到辅助性的作用。包含在java.lang.annotation包中。

​ 注释不会打包到class文件中,注解则可以被编译器打包进入class文件,因此,注解是一种用作标注的“元数据”。

注解大致分为三类

  • 第一类是由编译器使用的注解【这类注解不会被编译进入.class文件,它们在编译后就被编译器扔掉了。】
    阅读全文 »