【译文】原文地址
当我们需要在特定时间、或一定间隔时间运行任务时,需要事件调度系统来负责运行类似发送邮件、推送通知、半夜关闭帐户和清楚表格等任务。
本文我们将创建基础事件调度器,可以在一段时间后调度事件。使用数据库作为持久化层可以让我们理解事件调度系统是如何工作的。基本工作机制是:任何时候需要调度事件时,被调度job就会添加到数据库,并在给定时间执行。另一个任务将定期查询数据库确认是否有过期任务,如果过期就会执行对应的事件任务。
下面从创建要存储事件的数据库表开始。
CREATE TABLE IF NOT EXISTS "public"."jobs" (
"id" SERIAL PRIMARY KEY,
"name" varchar(50) NOT NULL,
"payload" text,
"runAt" TIMESTAMP NOT NULL
)
接下来定义数据结构:
- Event:需要调度的事件
- Listeners:事件监听列表
- ListenFun:事件触发后要执行的任务
// Listeners has attached event listeners
type Listeners map[string]ListenFunc
// ListenFunc function that listens to events
type ListenFunc func(string)
// Event structure
type Event struct {
ID uint
Name string
Payload string
}
下面定义调度器结构,将用于调度事件和运行listeners。
// Scheduler data structure
type Scheduler struct {
db *sql.DB
listeners Listeners
}
// NewScheduler creates a new scheduler
func NewScheduler(db *sql.DB, listeners Listeners) Scheduler {
return Scheduler{
db: db,
listeners: listeners,
}
}
以上代码通过传入sql.DB实例和初始化listeners来创建scheduler。接下来,我们需要添加调度函数来插入事件到job表中,如下所示:
// Schedule sechedules the provided events
func (s Scheduler) Schedule(event string, payload string, runAt time.Time) {
log.Print("? Scheduling event ", event, " to run at ", runAt)
_, err := s.db.Exec(`INSERT INTO "public"."jobs" ("name", "payload", "runAt") VALUES ($1, $2, $3)`, event, payload, runAt)
if err != nil {
log.Print("schedule insert error: ", err)
}
}
// AddListener adds the listener function to Listeners
func (s Scheduler) AddListener(event string, listenFunc ListenFunc) {
s.listeners[event] = listenFunc
}
这里在AddListener函数中我们仅将listener函数赋给事件名。
上面实现了代码的第一部分添加事件到表中。下面我们需要从数据中获取过期job,执行并删除掉。下面的函数实现展示了如何检查表中过期事件并将其序列化到Event结构体中。
// checkDueEvents checks and returns due events
func (s Scheduler) checkDueEvents() []Event {
events := []Event{}
rows, err := s.db.Query(`SELECT "id", "name", "payload" FROM "public"."jobs" WHERE "runAt" < $1`, time.Now())
if err != nil {
log.Print("? error: ", err)
return nil
}
for rows.Next() {
evt := Event{}
rows.Scan(&evt.ID, &evt.Name, &evt.Payload)
events = append(events, evt)
}
return events
}
代码第二部分调度表中注册的事件listeners:
// callListeners calls the event listener of provided event
func (s Scheduler) callListeners(event Event) {
eventFn, ok := s.listeners[event.Name]
if ok {
go eventFn(event.Payload)
_, err := s.db.Exec(`DELETE FROM "public"."jobs" WHERE "id" = $1`, event.ID)
if err != nil {
log.Print("? error: ", err)
}
} else {
log.Print("? error: couldn't find event listeners attached to ", event.Name)
}
}
这里我们检查事件函数是否存在,如果存在就执行函数。然后从数据库删除执行过的事件。最后一部分实现是在给定时间段去检查是否有过期事件需要执行。我们使用time包中的ticker函数来实现定期检查,ticker会在给定的时间向channel中发送数据。
// CheckEventsInInterval checks the event in given interval
func (s Scheduler) CheckEventsInInterval(ctx context.Context, duration time.Duration) {
ticker := time.NewTicker(duration)
go func() {
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
log.Println("⏰ Ticks Received...")
events := s.checkDueEvents()
for _, e := range events {
s.callListeners(e)
}
}
}
}()
}
使用select来检查context是否关闭或者ticker通道是否接收到数据。定时器接收到数据,就检查到期events,并执行对应的函数。
下面给出main函数:
package main
import (
"context"
"log"
"os"
"os/signal"
"time"
"github.com/dipeshdulal/event-scheduling/customevents"
)
var eventListeners = Listeners{
"SendEmail": customevents.SendEmail,
"PayBills": customevents.PayBills,
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
interrupt := make(chan os.Signal, 1)
signal.Notify(interrupt, os.Interrupt)
db := initDBConnection()
scheduler := NewScheduler(db, eventListeners)
scheduler.CheckEventsInInterval(ctx, time.Minute)
scheduler.Schedule("SendEmail", "mail: nilkantha.dipesh@gmail.com", time.Now().Add(1*time.Minute))
scheduler.Schedule("PayBills", "paybills: $4,000 bill", time.Now().Add(2*time.Minute))
go func() {
for range interrupt {
log.Println("\n❌ Interrupt received closing...")
cancel()
}
}()
<-ctx.Done()
}
输出为:
2021/01/16 11:58:49 ? Seeding database with table...
2021/01/16 11:58:49 ? Scheduling event SendEmail to run at 2021-01-16 11:59:49.344904505 +0545 +0545 m=+60.004623549
2021/01/16 11:58:49 ? Scheduling event PayBills to run at 2021-01-16 12:00:49.34773798 +0545 +0545 m=+120.007457039
2021/01/16 11:59:49 ⏰ Ticks Received...
2021/01/16 11:59:49 ? Sending email with data: mail: nilkantha.dipesh@gmail.com
2021/01/16 12:00:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ⏰ Ticks Received...
2021/01/16 12:01:49 ? Pay me a bill: paybills: $4,000 bill
2021/01/16 12:02:49 ⏰ Ticks Received...
2021/01/16 12:03:49 ⏰ Ticks Received...
^C2021/01/16 12:03:57
❌ Interrupt received closing...
从输出中,我们可以看到事件SendEmail在一分钟后触发,PayBills在两分钟后触发。
使用这种方法,我们创建了基本的事件调度系统,在一定间隔时间后调度事件。完整代码点击链接。
该用例仅展示了事件调度的实现,关于两个间隔时间重叠的处理没有说明,以及如何轮询。我们可以使用rabbitmq、kafka来实现可扩展的事件调度。