go进阶-计时器
timer
可以控制时间,确保应用程序的某段代码可以在某个时刻运行。在go中可以单次执行,也可以循环执行。
例如:
fmt.Printf("time.Now().Unix(): %v\n", time.Now().Unix())
// time.Now().Unix(): 1652706497
获取Unix时间戳。
基本特性
Timer
timer := time.NewTimer(time.Second * 2)
<-timer.C // 阻塞两秒后才会继续执行
fmt.Println("done")
根据此特性可以设置:如定时两秒后执行某个程序。
func main() {
v := make(chan struct{})
timer := time.AfterFunc(time.Second*2, func() {
fmt.Println("do sth...")
v <- struct{}{} // 信号,表明go func结束
})
defer timer.Stop()
<-v
}
Ticker
到时间之后会重新计时。
如,每秒执行一次,直到十秒:
unc main() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
done := make(chan bool)
go func() {
time.Sleep(10 * time.Second)
done <- true
}()
for {
select {
case <-done:
fmt.Println("time up...")
return
case t := <-ticker.C:
fmt.Printf("t.Unix(): %v\n", t.Unix())
}
}
}
最小堆:四叉堆

父节点和子节点之间有大小关系,同辈的节点之间没有大小关系。
数据结构
暴露出来的Timer
结构体:
type Timer struct {
C <-chan Time // 接收Timer触发的事件
r runtimeTimer
}
内部runtimeTimer
:
type runtimeTimer struct {
pp uintptr // 计时器所在的处理器的指针地址
when int64 // 计时器被唤醒的时间
period int64 // 再次被唤醒的时间when + peroid
f func(any, uintptr) // 唤醒时候的回调函数
arg any // 回调函数参数
seq uintptr // 回调函数参数,仅在网络场景用
nextwhen int64 // 计时器状态为modified时,设置到when字段
status uint32 // 计时器状态
}
每个timer
存储在对应的处理器中:
type p struct {
...
timersLock mutex
timers []*timer
...
}
timers
是一个四叉堆的结构,小的在上面,先执行:

脑海中浮现出两个问题:
- 为啥是四叉堆?不是二叉堆?别的堆?
首先,四叉堆新加入节点或者对节点元素进行修改,导致元素上浮时,比较次数肯定比二叉堆少,而且而且其次,N叉堆对数据访问更加集中在数组前部分,更有利于缓存。(堆的数据结构而言,数组的话。例如刚开始是第N个,对于二叉堆而言,接下来要访问它的父节点,也就是N/2个,但是如果是四叉堆得话,就需要访问N/4个来比较)
说的也的确比较有道理,这样看是越多叉越好。那为啥不是8呢?
一位仁兄@萌叔说的感觉比较有道理:
假设是d叉堆,n个节点。
首先比较插入,节点上浮:
比较次数是(logN / logd)
,也就是logd_n。例如二叉树,16个节点,第一层1个节点,第二层2个节点,第三层4个节点,第4层8个节点,第5层1个节点。比较的次数也就是二叉堆的层数。

删除的话,是节点下沉:
以这个图为例

因为子节点之间没有关联,所以四个节点都要访问,才能找到最小的。也就是比上浮的操作每层乘以一个d。
O(d * log n / log d)

这样一比较,四叉堆是个最好的选择。
好像挺有道理。
- 堆何时变化,也就是何时把那个到了时间的头结点打掉?是遍历吗?还是时间到了会有个信号通知?
下面再说。先看看用法。
用法和实现原理
定时器状态

创建定时器
Timer用法
timer := time.NewTimer(time.Second * 2)
<-timer.C // 阻塞两秒后才会继续执行
fmt.Println("done")
Timer原理
// NewTimer creates a new Timer that will send
// the current time on its channel after at least duration d.
func NewTimer(d Duration) *Timer {
c := make(chan Time, 1)
t := &Timer{
C: c,
r: runtimeTimer{
when: when(d), //1 设置到期时间
f: sendTime, //2 设置回调函数,发送当前时间
arg: c,
},
}
startTimer(&t.r)
return t
}
根据前面数据结构提到的Timer结构体,和几个辅助函数:
// when is a helper function for setting the 'when' field of a runtimeTimer.
// It returns what the time will be, in nanoseconds, Duration d in the future.
// If d is negative, it is ignored. If the returned value would be less than
// zero because of an overflow, MaxInt64 is returned.
func when(d Duration) int64 {
if d <= 0 {
return runtimeNano()
}
t := runtimeNano() + int64(d)
if t < 0 {
// N.B. runtimeNano() and d are always positive, so addition
// (including overflow) will never result in t == 0.
t = 1<<63 - 1 // math.MaxInt64
}
return t
}
// sendTime does a non-blocking send of the current time on c.
func sendTime(c any, seq uintptr) {
select {
case c.(chan Time) <- Now():
default:
}
}
//具体函数在runtime里面
func startTimer(*runtimeTimer)
runtime/time.go
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
Ticker
func NewTicker(d Duration) *Ticker {
if d <= 0 {
panic(errors.New("non-positive interval for NewTicker"))
}
// Give the channel a 1-element time buffer.
// If the client falls behind while reading, we drop ticks
// on the floor until the client catches up.
c := make(chan Time, 1)
t := &Ticker{
C: c,
r: runtimeTimer{
when: when(d),
period: int64(d),
f: sendTime,
arg: c,
},
}
startTimer(&t.r)
return t
}
和Timer
的区别就是多了一个peroid
(再次被唤醒的时间when + peroid)。
启动定时器
刚才提到:创建定时器里面的startTimer
//具体函数在runtime里面
func startTimer(*runtimeTimer)
runtime/time.go
// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
if raceenabled {
racerelease(unsafe.Pointer(t))
}
addtimer(t)
}
具体addtimer
的行为呢?
func addtimer(t *timer) {
// ... 校验
t.status = timerWaiting // 等待计时器启动
when := t.when
// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp) // 清理计时器队列
doaddtimer(pp, t) // 添加到当前处理器的堆中
unlock(&pp.timersLock)
// 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
wakeNetPoller(when)
releasem(mp)
}
wakeNetPoller的行为还是有点迷惑。
停止计时器
timer := time.NewTicker(time.Second * 2)
go func() {
<-timer.C // 阻塞两秒后才会继续执行
fmt.Println("done")
}()
timer.Stop() // 不等定时器结束,我不想要了,stop
原理
func (t *Ticker) Stop() {
stopTimer(&t.r)
}
// 同样对应一个runtime里面的函数
func stopTimer(*runtimeTimer) bool
// runtime/time.go
// stopTimer stops a timer.
// It reports whether t was stopped before being run.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
return deltimer(t)
}
// deltimer deletes the timer t. It may be on some other P, so we can't
// actually remove it from the timers heap. We can only mark it as deleted.
// It will be removed in due course by the P whose heap it is on.
// Reports whether the timer was removed before it was run.
func deltimer(t *timer) bool {
for {
switch s := atomic.Load(&t.status); s {
case timerWaiting, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// Must fetch t.pp before changing status,
// as cleantimers in another goroutine
// can clear t.pp of a timerDeleted timer.
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerModifiedEarlier:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp := acquirem()
if atomic.Cas(&t.status, s, timerModifying) {
// Must fetch t.pp before setting status
// to timerDeleted.
tpp := t.pp.ptr()
if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer()
}
releasem(mp)
atomic.Xadd(&tpp.deletedTimers, 1)
// Timer was not yet run.
return true
} else {
releasem(mp)
}
case timerDeleted, timerRemoving, timerRemoved:
// Timer was already run.
return false
case timerRunning, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerNoStatus:
// Removing timer that was never added or
// has already been run. Also see issue 21874.
return false
case timerModifying:
// Simultaneous calls to deltimer and modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
}
并不在这里真的删除处理器上挂载的timer
节点,而是设置状态。
修改/重置计数器
timer := time.NewTicker(time.Second * 2)
go func() {
<-timer.C // 等待计时器结束再接下来执行
fmt.Println("done")
}()
time.Sleep(time.Second)
timer.Reset(time.Second * 10) // 改为十秒后执行
原理
func (t *Ticker) Reset(d Duration) {
if d <= 0 {
panic("non-positive interval for Ticker.Reset")
}
if t.r.f == nil {
panic("time: Reset called on uninitialized Ticker")
}
modTimer(&t.r, when(d), int64(d), t.r.f, t.r.arg, t.r.seq)
}
不出意外的,modTimer函数实体仍然在runtime包中:
// modtimer modifies an existing timer.
// This is called by the netpoll code or time.Ticker.Reset or time.Timer.Reset.
// Reports whether the timer was modified before it was run.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) bool {
if when <= 0 {
throw("timer when must be positive")
}
if period < 0 {
throw("timer period must be non-negative")
}
status := uint32(timerNoStatus)
wasRemoved := false
var pending bool
var mp *m
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
pending = true // timer not yet run
break loop
}
releasem(mp)
case timerNoStatus, timerRemoved:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
// Timer was already run and t is no longer in a heap.
// Act like addtimer.
if atomic.Cas(&t.status, status, timerModifying) {
wasRemoved = true
pending = false // timer already run or stopped
break loop
}
releasem(mp)
case timerDeleted:
// Prevent preemption while the timer is in timerModifying.
// This could lead to a self-deadlock. See #38070.
mp = acquirem()
if atomic.Cas(&t.status, status, timerModifying) {
atomic.Xadd(&t.pp.ptr().deletedTimers, -1)
pending = false // timer already stopped
break loop
}
releasem(mp)
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerModifying:
// Multiple simultaneous calls to modtimer.
// Wait for the other call to complete.
osyield()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved {
t.when = when
pp := getg().m.p.ptr()
lock(&pp.timersLock)
doaddtimer(pp, t)
unlock(&pp.timersLock)
if !atomic.Cas(&t.status, timerModifying, timerWaiting) {
badTimer()
}
releasem(mp)
wakeNetPoller(when)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
tpp := t.pp.ptr()
if newStatus == timerModifiedEarlier {
updateTimerModifiedEarliest(tpp, when)
}
// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
releasem(mp)
// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
return pending
}
做的还是变更timer的状态。
触发计时器
Go1.14之后,Timer归属到了各个处理器之中,这里有两部分内容:
- 通过调度器在调度时进行计时器的触发。
- 通过系统监控检查并触发计时器(到期未执行)。
调度器触发
- 调用 NewTimer,timer.After, timer.AfterFunc 生产 timer, 加入对应的 P 的堆上。
- 调用 timer.Stop, timer.Reset 改变对应的 timer 的状态。
- GMP 在调度周期内中会调用 checkTimers ,遍历该 P 的 timer 堆上的元素,根据对应 timer 的状态执行真的操作。
- 当前处理器如果没有可执行的Timer,并且没有可执行的G,那么按照调度模型,就会去窃取其他计时器和 G。
系统监控触发
即使是通过每次调度器调度和窃取的时候触发,但毕竟是具有一定的随机和不确定性。因此系统监控触发依然是一个兜底保障,在 Go 语言中 runtime.sysmon
方法承担了这一个责任。
剩余问题
wakeNetPoller的行为还是有点迷惑。例如在addTimer里面:
func addtimer(t *timer) {
// ... 校验
t.status = timerWaiting // 等待计时器启动
when := t.when
// Disable preemption while using pp to avoid changing another P's heap.
mp := acquirem()
pp := getg().m.p.ptr()
lock(&pp.timersLock)
cleantimers(pp) // 清理计时器队列
doaddtimer(pp, t) // 添加到当前处理器的堆中
unlock(&pp.timersLock)
// 中断正在阻塞的网络轮询,根据时间判断是否需要唤醒网络轮询器中休眠的线程。
wakeNetPoller(when)
releasem(mp)
}
具体是啥看下定义:
// wakeNetPoller wakes up the thread sleeping in the network poller if it isn't
// going to wake up before the when argument; or it wakes an idle P to service
// timers and the network poller if there isn't one already.
func wakeNetPoller(when int64) {
if atomic.Load64(&sched.lastpoll) == 0 {
// In findrunnable we ensure that when polling the pollUntil
// field is either zero or the time to which the current
// poll is expected to run. This can have a spurious wakeup
// but should never miss a wakeup.
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > when {
netpollBreak()
}
} else {
// There are no threads in the network poller, try to get
// one there so it can handle new timers.
if GOOS != "plan9" { // Temporary workaround - see issue #42303.
wakep()
}
}
}
唤醒网络轮询器中休眠的线程,检查计时器被唤醒的时间(when)是否在当前轮询预期运行的时间(pollerPollUntil)内,若是唤醒。
总结
其实还是有点迷惑的,但是再深入就牵扯到更深更杂的东西了,螺旋上升吧,继续努力,共勉。