diff --git a/common/utils/cronex/LICENSE b/common/utils/cronex/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/common/utils/cronex/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/common/utils/cronex/README.md b/common/utils/cronex/README.md new file mode 100644 index 00000000..ff6846fb --- /dev/null +++ b/common/utils/cronex/README.md @@ -0,0 +1,173 @@ +# cronex +高性能cron库,相比目前使用得最多的cron,只是优化了性能。 + +# 特性 +* 继承robfig/cron全部的解析器代码 +* 优化调度相关性能 + +# cpu占用对比(越低越好) +![cronex.png](https://github.com/guonaihong/images/blob/master/cronex/cronex.png) +测试代码位置 https://github.com/guonaihong/crontest +# 快速开始 +```go +import( + "github.com/antlabs/cronex" +) + +func main() { + cron := cronex.New() + cron.AddFunc("* * * * * *", func() { + //TODO + }) + cron.Run() //开启阻塞消费者循环,如果要异步就用cron.Start() +} +``` + +# 关闭任务 +```go +import( + "github.com/antlabs/cronex" +) + +func main() { + cron := cronex.New() + tm, err := cron.AddFunc("* * * * * *", func() { + //TODO + }) + if err != nil { + return + } + tm.Stop() //删除这个任务 + cron.Run() //开启阻塞消费者循环,如果要异步就用cron.Start() +} +``` +## timer +[![Go](https://github.com/antlabs/timer/workflows/Go/badge.svg)](https://github.com/antlabs/timer/actions) +[![codecov](https://codecov.io/gh/antlabs/timer/branch/master/graph/badge.svg)](https://codecov.io/gh/antlabs/timer) + +timer是高性能定时器库 +## feature +* 支持一次性定时器 +* 支持周期性定时器 +* 支持多种数据结构后端,最小堆,5级时间轮 + +## 一次性定时器 +```go +import ( + "github.com/antlabs/timer" + "log" +) + +func main() { + tm := timer.NewTimer() + + tm.AfterFunc(1*time.Second, func() { + log.Printf("after\n") + }) + + tm.AfterFunc(10*time.Second, func() { + log.Printf("after\n") + }) + tm.Run() +} +``` +## 周期性定时器 +```go +import ( + "github.com/antlabs/timer" + "log" +) + +func main() { + tm := timer.NewTimer() + + tm.ScheduleFunc(1*time.Second, func() { + log.Printf("schedule\n") + }) + + tm.Run() +} +``` +## 自定义周期性定时器 +实现时间翻倍定时的例子 +```go +type curstomTest struct { + count int +} +// 只要实现Next接口就行 +func (c *curstomTest) Next(now time.Time) (rv time.Time) { + rv = now.Add(time.Duration(c.count) * time.Millisecond * 10) + c.count++ + return +} + +func main() { + tm := timer.NewTimer(timer.WithMinHeap()) + node := tm.CustomFunc(&curstomTest{count: 1}, func() { + log.Printf("%v\n", time.Now()) + }) + tm.Run() +} +``` +## 取消某一个定时器 +```go +import ( + "log" + "time" + + "github.com/antlabs/timer" +) + +func main() { + + tm := timer.NewTimer() + + // 只会打印2 time.Second + tm.AfterFunc(2*time.Second, func() { + log.Printf("2 time.Second") + }) + + // tk3 会被 tk3.Stop()函数调用取消掉 + tk3 := tm.AfterFunc(3*time.Second, func() { + log.Printf("3 time.Second") + }) + + tk3.Stop() //取消tk3 + + tm.Run() +} +``` +## 选择不同的的数据结构 +```go +import ( + "github.com/antlabs/timer" + "log" +) + +func main() { + tm := timer.NewTimer(timer.WithMinHeap())// 选择最小堆,默认时间轮 +} +``` +## benchmark + +github.com/antlabs/timer 性能最高 +``` +goos: linux +goarch: amd64 +pkg: benchmark +Benchmark_antlabs_Timer_AddTimer/N-1m-16 9177537 124 ns/op +Benchmark_antlabs_Timer_AddTimer/N-5m-16 10152950 128 ns/op +Benchmark_antlabs_Timer_AddTimer/N-10m-16 9955639 127 ns/op +Benchmark_RussellLuo_Timingwheel_AddTimer/N-1m-16 5316916 222 ns/op +Benchmark_RussellLuo_Timingwheel_AddTimer/N-5m-16 5848843 218 ns/op +Benchmark_RussellLuo_Timingwheel_AddTimer/N-10m-16 5872621 231 ns/op +Benchmark_ouqiang_Timewheel/N-1m-16 720667 1622 ns/op +Benchmark_ouqiang_Timewheel/N-5m-16 807018 1573 ns/op +Benchmark_ouqiang_Timewheel/N-10m-16 666183 1557 ns/op +Benchmark_Stdlib_AddTimer/N-1m-16 8031864 144 ns/op +Benchmark_Stdlib_AddTimer/N-5m-16 8437442 151 ns/op +Benchmark_Stdlib_AddTimer/N-10m-16 8080659 167 ns/op + +``` +* 压测代码位于 +https://bgithub.xyz/junelabs/timer-benchmark \ No newline at end of file diff --git a/common/utils/cronex/cronex.go b/common/utils/cronex/cronex.go new file mode 100644 index 00000000..dd1b2092 --- /dev/null +++ b/common/utils/cronex/cronex.go @@ -0,0 +1,52 @@ +// guonaihong apache 2.0 + +package cronex + +import ( + "github.com/antlabs/timer" +) + +type TimerNoder = timer.TimeNoder +type Option = timer.Option + +// cronex +type Cronex struct { + timer.Timer +} + +// 初始化一个cronex +func New(opt ...Option) *Cronex { + if len(opt) == 0 { + opt = append(opt, timer.WithMinHeap()) + } + return &Cronex{ + Timer: timer.NewTimer(opt...), + } +} + +// 添加函数 +func (c *Cronex) AddFunc(spec string, cmd func()) (node TimerNoder, err error) { + var schedule timer.Next + schedule, err = standardParser.Parse(spec) + if err != nil { + return + } + + return c.CustomFunc(schedule, cmd), nil +} + +// 运行消费者循环 +func (c *Cronex) Run() { + c.Run() +} + +// 异步运行消费者循环 +func (c *Cronex) Start() { + //ants.Submit(c.Run) + go c.Run() +} + +// 关闭cronex的任务循环 +func (c *Cronex) Stop() { + c.Stop() +} diff --git a/common/utils/cronex/cronex_test.go b/common/utils/cronex/cronex_test.go new file mode 100644 index 00000000..86b91dca --- /dev/null +++ b/common/utils/cronex/cronex_test.go @@ -0,0 +1,112 @@ +package cronex + +import ( + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/antlabs/timer" +) + +func Test_Cronex(t *testing.T) { + table := []string{ + "*/1 * * * * *", //每秒执行一次 + } + + cron := New() + cron.Start() //运行事件循环 + + count := 2 + durationChan := make(chan time.Duration, count) + now := time.Now() + var tm TimerNoder + var err error + + for _, tc := range table { + tm, err = cron.AddFunc(tc, func() { + + durationChan <- time.Since(now) + }) + if err != nil { + t.Logf("err(%v)", err) + return + } + } + + // 3s之后关闭 + go func() { + time.Sleep(time.Second * 3) + tm.Stop() + cron.Stop() + close(durationChan) + }() + + count = 0 + first := time.Duration(0) + for tv := range durationChan { + if first == 0 { + first = tv + continue + } + + left := first + time.Duration(count)*time.Second + right := first + time.Duration(1.2*float64(time.Duration(count)*time.Second)) + + if tv < left || tv > right { + t.Logf("count(%d), tv(%v), tv < left(%v) || tv > right(%v)", count, tv, left, right) + return + } + count++ + } + + if count != 1 { + t.Logf("count(%d), count != 1, callback 没有调用", count) + return + } +} + +// 测试下Next函数的时间可正确 +func Test_Cronex_ParseNext(t *testing.T) { + + var schedule timer.Next + schedule, err := standardParser.Parse("* * * * * *") + if err != nil { + t.Logf("err(%v)", err) + return + } + + first := time.Duration(0) + for count := 1; count < 4; count++ { + now := schedule.Next(time.Now()) + left := first + time.Duration(0.8*float64(time.Second)) + right := first + time.Duration(1.2*float64(time.Second)) + + tv := now.Sub(time.Now()) + if tv < left || tv > right { + t.Logf("tv(%v), tv < left(%v) || tv > right(%v)", tv, left, right) + return + } + } +} + +// 多次运行的例子 +func Test_Multiple(t *testing.T) { + cron := New() + count := int32(0) + + cron.Start() + max := int32(10) + for i := int32(0); i < max; i++ { + cron.AddFunc("* * * * * *", func() { + fmt.Printf("Every Second") + atomic.AddInt32(&count, 1) + }) + } + + time.Sleep(time.Duration(1.1 * float64(time.Second))) + cron.Stop() + if count != max { + t.Errorf("expected %d, got %d", max, count) + } +} diff --git a/common/utils/cronex/go.mod b/common/utils/cronex/go.mod new file mode 100644 index 00000000..ba50653f --- /dev/null +++ b/common/utils/cronex/go.mod @@ -0,0 +1,7 @@ +module github.com/antlabs/cronex + +go 1.19 + +require github.com/antlabs/timer v0.1.4 + +require github.com/antlabs/stl v0.0.2 // indirect diff --git a/common/utils/cronex/go.sum b/common/utils/cronex/go.sum new file mode 100644 index 00000000..8fa544f2 --- /dev/null +++ b/common/utils/cronex/go.sum @@ -0,0 +1,4 @@ +github.com/antlabs/stl v0.0.2 h1:sna1AXR5yIkNE9lWhCcKbheFJSVfCa3vugnGyakI79s= +github.com/antlabs/stl v0.0.2/go.mod h1:kKrO4xrn9cfS1mJVo+/BqePZjAYMXqD0amGF2Ouq7ac= +github.com/antlabs/timer v0.1.4 h1:MHdE00MDnNfhJCmqSOdLXs35uGNwfkMwfbynxrGmQ1c= +github.com/antlabs/timer v0.1.4/go.mod h1:mpw4zlD5KVjstEyUDp43DGLWsY076Mdo4bS78NTseRE= diff --git a/common/utils/cronex/parser.go b/common/utils/cronex/parser.go new file mode 100644 index 00000000..6398ffb4 --- /dev/null +++ b/common/utils/cronex/parser.go @@ -0,0 +1,655 @@ +//Copyright (C) 2012 Rob Figueiredo +//2022 guonaihong, +// 1.修改内容,裁剪代码只保留一个干净的解析器 +// 2.默认让parse支持到秒级别 +//All Rights Reserved. +package cronex + +import ( + "fmt" + "math" + "strconv" + "strings" + "time" +) + +// Configuration options for creating a parser. Most options specify which +// fields should be included, while others enable features. If a field is not +// included the parser will assume a default value. These options do not change +// the order fields are parse in. +type ParseOption int + +const ( + Second ParseOption = 1 << iota // Seconds field, default 0 + SecondOptional // Optional seconds field, default 0 + Minute // Minutes field, default 0 + Hour // Hours field, default 0 + Dom // Day of month field, default * + Month // Month field, default * + Dow // Day of week field, default * + DowOptional // Optional day of week field, default * + Descriptor // Allow descriptors such as @monthly, @weekly, etc. +) + +var places = []ParseOption{ + Second, + Minute, + Hour, + Dom, + Month, + Dow, +} + +var defaults = []string{ + "0", + "0", + "0", + "*", + "*", + "*", +} + +// A custom Parser that can be configured. +type Parser struct { + options ParseOption +} + +// NewParser creates a Parser with custom options. +// +// It panics if more than one Optional is given, since it would be impossible to +// correctly infer which optional is provided or missing in general. +// +// Examples +// +// // Standard parser without descriptors +// specParser := NewParser(Minute | Hour | Dom | Month | Dow) +// sched, err := specParser.Parse("0 0 15 */3 *") +// +// // Same as above, just excludes time fields +// specParser := NewParser(Dom | Month | Dow) +// sched, err := specParser.Parse("15 */3 *") +// +// // Same as above, just makes Dow optional +// specParser := NewParser(Dom | Month | DowOptional) +// sched, err := specParser.Parse("15 */3") +// +func NewParser(options ParseOption) Parser { + optionals := 0 + if options&DowOptional > 0 { + optionals++ + } + if options&SecondOptional > 0 { + optionals++ + } + if optionals > 1 { + panic("multiple optionals may not be configured") + } + return Parser{options} +} + +// Parse returns a new crontab schedule representing the given spec. +// It returns a descriptive error if the spec is not valid. +// It accepts crontab specs and features configured by NewParser. +func (p Parser) Parse(spec string) (Schedule, error) { + if len(spec) == 0 { + return nil, fmt.Errorf("empty spec string") + } + + // Extract timezone if present + var loc = time.Local + if strings.HasPrefix(spec, "TZ=") || strings.HasPrefix(spec, "CRON_TZ=") { + var err error + i := strings.Index(spec, " ") + eq := strings.Index(spec, "=") + if loc, err = time.LoadLocation(spec[eq+1 : i]); err != nil { + return nil, fmt.Errorf("provided bad location %s: %v", spec[eq+1:i], err) + } + spec = strings.TrimSpace(spec[i:]) + } + + // Handle named schedules (descriptors), if configured + if strings.HasPrefix(spec, "@") { + if p.options&Descriptor == 0 { + return nil, fmt.Errorf("parser does not accept descriptors: %v", spec) + } + return parseDescriptor(spec, loc) + } + + // Split on whitespace. + fields := strings.Fields(spec) + + // Validate & fill in any omitted or optional fields + var err error + fields, err = normalizeFields(fields, p.options) + if err != nil { + return nil, err + } + + field := func(field string, r bounds) uint64 { + if err != nil { + return 0 + } + var bits uint64 + bits, err = getField(field, r) + return bits + } + + var ( + second = field(fields[0], seconds) + minute = field(fields[1], minutes) + hour = field(fields[2], hours) + dayofmonth = field(fields[3], dom) + month = field(fields[4], months) + dayofweek = field(fields[5], dow) + ) + if err != nil { + return nil, err + } + + return &SpecSchedule{ + Second: second, + Minute: minute, + Hour: hour, + Dom: dayofmonth, + Month: month, + Dow: dayofweek, + Location: loc, + }, nil +} + +// normalizeFields takes a subset set of the time fields and returns the full set +// with defaults (zeroes) populated for unset fields. +// +// As part of performing this function, it also validates that the provided +// fields are compatible with the configured options. +func normalizeFields(fields []string, options ParseOption) ([]string, error) { + // Validate optionals & add their field to options + optionals := 0 + if options&SecondOptional > 0 { + options |= Second + optionals++ + } + if options&DowOptional > 0 { + options |= Dow + optionals++ + } + if optionals > 1 { + return nil, fmt.Errorf("multiple optionals may not be configured") + } + + // Figure out how many fields we need + max := 0 + for _, place := range places { + if options&place > 0 { + max++ + } + } + min := max - optionals + + // Validate number of fields + if count := len(fields); count < min || count > max { + if min == max { + return nil, fmt.Errorf("expected exactly %d fields, found %d: %s", min, count, fields) + } + return nil, fmt.Errorf("expected %d to %d fields, found %d: %s", min, max, count, fields) + } + + // Populate the optional field if not provided + if min < max && len(fields) == min { + switch { + case options&DowOptional > 0: + fields = append(fields, defaults[5]) // TODO: improve access to default + case options&SecondOptional > 0: + fields = append([]string{defaults[0]}, fields...) + default: + return nil, fmt.Errorf("unknown optional field") + } + } + + // Populate all fields not part of options with their defaults + n := 0 + expandedFields := make([]string, len(places)) + copy(expandedFields, defaults) + for i, place := range places { + if options&place > 0 { + expandedFields[i] = fields[n] + n++ + } + } + return expandedFields, nil +} + +var standardParser = NewParser( + SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor, +) + +// ParseStandard returns a new crontab schedule representing the given +// standardSpec (https://en.wikipedia.org/wiki/Cron). It requires 5 entries +// representing: minute, hour, day of month, month and day of week, in that +// order. It returns a descriptive error if the spec is not valid. +// +// It accepts +// - Standard crontab specs, e.g. "* * * * ?" +// - Descriptors, e.g. "@midnight", "@every 1h30m" +func ParseStandard(standardSpec string) (Schedule, error) { + return standardParser.Parse(standardSpec) +} + +// getField returns an Int with the bits set representing all of the times that +// the field represents or error parsing field value. A "field" is a comma-separated +// list of "ranges". +func getField(field string, r bounds) (uint64, error) { + var bits uint64 + ranges := strings.FieldsFunc(field, func(r rune) bool { return r == ',' }) + for _, expr := range ranges { + bit, err := getRange(expr, r) + if err != nil { + return bits, err + } + bits |= bit + } + return bits, nil +} + +// getRange returns the bits indicated by the given expression: +// number | number "-" number [ "/" number ] +// or error parsing range. +func getRange(expr string, r bounds) (uint64, error) { + var ( + start, end, step uint + rangeAndStep = strings.Split(expr, "/") + lowAndHigh = strings.Split(rangeAndStep[0], "-") + singleDigit = len(lowAndHigh) == 1 + err error + ) + + var extra uint64 + if lowAndHigh[0] == "*" || lowAndHigh[0] == "?" { + start = r.min + end = r.max + extra = starBit + } else { + start, err = parseIntOrName(lowAndHigh[0], r.names) + if err != nil { + return 0, err + } + switch len(lowAndHigh) { + case 1: + end = start + case 2: + end, err = parseIntOrName(lowAndHigh[1], r.names) + if err != nil { + return 0, err + } + default: + return 0, fmt.Errorf("too many hyphens: %s", expr) + } + } + + switch len(rangeAndStep) { + case 1: + step = 1 + case 2: + step, err = mustParseInt(rangeAndStep[1]) + if err != nil { + return 0, err + } + + // Special handling: "N/step" means "N-max/step". + if singleDigit { + end = r.max + } + if step > 1 { + extra = 0 + } + default: + return 0, fmt.Errorf("too many slashes: %s", expr) + } + + if start < r.min { + return 0, fmt.Errorf("beginning of range (%d) below minimum (%d): %s", start, r.min, expr) + } + if end > r.max { + return 0, fmt.Errorf("end of range (%d) above maximum (%d): %s", end, r.max, expr) + } + if start > end { + return 0, fmt.Errorf("beginning of range (%d) beyond end of range (%d): %s", start, end, expr) + } + if step == 0 { + return 0, fmt.Errorf("step of range should be a positive number: %s", expr) + } + + return getBits(start, end, step) | extra, nil +} + +// parseIntOrName returns the (possibly-named) integer contained in expr. +func parseIntOrName(expr string, names map[string]uint) (uint, error) { + if names != nil { + if namedInt, ok := names[strings.ToLower(expr)]; ok { + return namedInt, nil + } + } + return mustParseInt(expr) +} + +// mustParseInt parses the given expression as an int or returns an error. +func mustParseInt(expr string) (uint, error) { + num, err := strconv.Atoi(expr) + if err != nil { + return 0, fmt.Errorf("failed to parse int from %s: %s", expr, err) + } + if num < 0 { + return 0, fmt.Errorf("negative number (%d) not allowed: %s", num, expr) + } + + return uint(num), nil +} + +// getBits sets all bits in the range [min, max], modulo the given step size. +func getBits(min, max, step uint) uint64 { + var bits uint64 + + // If step is 1, use shifts. + if step == 1 { + return ^(math.MaxUint64 << (max + 1)) & (math.MaxUint64 << min) + } + + // Else, use a simple loop. + for i := min; i <= max; i += step { + bits |= 1 << i + } + return bits +} + +// all returns all bits within the given bounds. (plus the star bit) +func all(r bounds) uint64 { + return getBits(r.min, r.max, 1) | starBit +} + +// parseDescriptor returns a predefined schedule for the expression, or error if none matches. +func parseDescriptor(descriptor string, loc *time.Location) (Schedule, error) { + switch descriptor { + case "@yearly", "@annually": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: 1 << months.min, + Dow: all(dow), + Location: loc, + }, nil + + case "@monthly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + case "@weekly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: 1 << dow.min, + Location: loc, + }, nil + + case "@daily", "@midnight": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + case "@hourly": + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: all(hours), + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: loc, + }, nil + + } + + const every = "@every " + if strings.HasPrefix(descriptor, every) { + duration, err := time.ParseDuration(descriptor[len(every):]) + if err != nil { + return nil, fmt.Errorf("failed to parse duration %s: %s", descriptor, err) + } + return Every(duration), nil + } + + return nil, fmt.Errorf("unrecognized descriptor: %s", descriptor) +} + +// SpecSchedule specifies a duty cycle (to the second granularity), based on a +// traditional crontab specification. It is computed initially and stored as bit sets. +type SpecSchedule struct { + Second, Minute, Hour, Dom, Month, Dow uint64 + + // Override location for this schedule. + Location *time.Location +} + +// Next returns the next time this schedule is activated, greater than the given +// time. If no time can be found to satisfy the schedule, return the zero time. +func (s *SpecSchedule) Next(t time.Time) time.Time { + // General approach + // + // For Month, Day, Hour, Minute, Second: + // Check if the time value matches. If yes, continue to the next field. + // If the field doesn't match the schedule, then increment the field until it matches. + // While incrementing the field, a wrap-around brings it back to the beginning + // of the field list (since it is necessary to re-verify previous field + // values) + + // Convert the given time into the schedule's timezone, if one is specified. + // Save the original timezone so we can convert back after we find a time. + // Note that schedules without a time zone specified (time.Local) are treated + // as local to the time provided. + origLocation := t.Location() + loc := s.Location + if loc == time.Local { + loc = t.Location() + } + if s.Location != time.Local { + t = t.In(s.Location) + } + + // Start at the earliest possible time (the upcoming second). + t = t.Add(1*time.Second - time.Duration(t.Nanosecond())*time.Nanosecond) + + // This flag indicates whether a field has been incremented. + added := false + + // If no time is found within five years, return zero. + yearLimit := t.Year() + 5 + +WRAP: + if t.Year() > yearLimit { + return time.Time{} + } + + // Find the first applicable month. + // If it's this month, then do nothing. + for 1< 12 { + t = t.Add(time.Duration(24-t.Hour()) * time.Hour) + } else { + t = t.Add(time.Duration(-t.Hour()) * time.Hour) + } + } + + if t.Day() == 1 { + goto WRAP + } + } + + for 1< 0 + dowMatch bool = 1< 0 + ) + if s.Dom&starBit > 0 || s.Dow&starBit > 0 { + return domMatch && dowMatch + } + return domMatch || dowMatch +} + +// bounds provides a range of acceptable values (plus a map of name to value). +type bounds struct { + min, max uint + names map[string]uint +} + +// The bounds for each field. +var ( + seconds = bounds{0, 59, nil} + minutes = bounds{0, 59, nil} + hours = bounds{0, 23, nil} + dom = bounds{1, 31, nil} + months = bounds{1, 12, map[string]uint{ + "jan": 1, + "feb": 2, + "mar": 3, + "apr": 4, + "may": 5, + "jun": 6, + "jul": 7, + "aug": 8, + "sep": 9, + "oct": 10, + "nov": 11, + "dec": 12, + }} + dow = bounds{0, 6, map[string]uint{ + "sun": 0, + "mon": 1, + "tue": 2, + "wed": 3, + "thu": 4, + "fri": 5, + "sat": 6, + }} +) + +const ( + // Set the top bit if a star was included in the expression. + starBit = 1 << 63 +) + +// ConstantDelaySchedule represents a simple recurring duty cycle, e.g. "Every 5 minutes". +// It does not support jobs more frequent than once a second. +type ConstantDelaySchedule struct { + Delay time.Duration +} + +// Every returns a crontab Schedule that activates once every duration. +// Delays of less than a second are not supported (will round up to 1 second). +// Any fields less than a Second are truncated. +func Every(duration time.Duration) ConstantDelaySchedule { + if duration < time.Second { + duration = time.Second + } + return ConstantDelaySchedule{ + Delay: duration - time.Duration(duration.Nanoseconds())%time.Second, + } +} + +// Next returns the next time this should be run. +// This rounds so that the next activation time will be on the second. +func (schedule ConstantDelaySchedule) Next(t time.Time) time.Time { + return t.Add(schedule.Delay - time.Duration(t.Nanosecond())*time.Nanosecond) +} + +// Schedule describes a job's duty cycle. +type Schedule interface { + // Next returns the next activation time, later than the given time. + // Next is invoked initially, and then each time the job is run. + Next(time.Time) time.Time +} diff --git a/common/utils/cronex/parser_test.go b/common/utils/cronex/parser_test.go new file mode 100644 index 00000000..63c521a8 --- /dev/null +++ b/common/utils/cronex/parser_test.go @@ -0,0 +1,383 @@ +package cronex + +import ( + "reflect" + "strings" + "testing" + "time" +) + +var secondParser = NewParser(Second | Minute | Hour | Dom | Month | DowOptional | Descriptor) + +func TestRange(t *testing.T) { + zero := uint64(0) + ranges := []struct { + expr string + min, max uint + expected uint64 + err string + }{ + {"5", 0, 7, 1 << 5, ""}, + {"0", 0, 7, 1 << 0, ""}, + {"7", 0, 7, 1 << 7, ""}, + + {"5-5", 0, 7, 1 << 5, ""}, + {"5-6", 0, 7, 1<<5 | 1<<6, ""}, + {"5-7", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, + + {"5-6/2", 0, 7, 1 << 5, ""}, + {"5-7/2", 0, 7, 1<<5 | 1<<7, ""}, + {"5-7/1", 0, 7, 1<<5 | 1<<6 | 1<<7, ""}, + + {"*", 1, 3, 1<<1 | 1<<2 | 1<<3 | starBit, ""}, + {"*/2", 1, 3, 1<<1 | 1<<3, ""}, + + {"5--5", 0, 0, zero, "too many hyphens"}, + {"jan-x", 0, 0, zero, "failed to parse int from"}, + {"2-x", 1, 5, zero, "failed to parse int from"}, + {"*/-12", 0, 0, zero, "negative number"}, + {"*//2", 0, 0, zero, "too many slashes"}, + {"1", 3, 5, zero, "below minimum"}, + {"6", 3, 5, zero, "above maximum"}, + {"5-3", 3, 5, zero, "beyond end of range"}, + {"*/0", 0, 0, zero, "should be a positive number"}, + } + + for _, c := range ranges { + actual, err := getRange(c.expr, bounds{c.min, c.max, nil}) + if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if len(c.err) == 0 && err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if actual != c.expected { + t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) + } + } +} + +func TestField(t *testing.T) { + fields := []struct { + expr string + min, max uint + expected uint64 + }{ + {"5", 1, 7, 1 << 5}, + {"5,6", 1, 7, 1<<5 | 1<<6}, + {"5,6,7", 1, 7, 1<<5 | 1<<6 | 1<<7}, + {"1,5-7/2,3", 1, 7, 1<<1 | 1<<5 | 1<<7 | 1<<3}, + } + + for _, c := range fields { + actual, _ := getField(c.expr, bounds{c.min, c.max, nil}) + if actual != c.expected { + t.Errorf("%s => expected %d, got %d", c.expr, c.expected, actual) + } + } +} + +func TestAll(t *testing.T) { + allBits := []struct { + r bounds + expected uint64 + }{ + {minutes, 0xfffffffffffffff}, // 0-59: 60 ones + {hours, 0xffffff}, // 0-23: 24 ones + {dom, 0xfffffffe}, // 1-31: 31 ones, 1 zero + {months, 0x1ffe}, // 1-12: 12 ones, 1 zero + {dow, 0x7f}, // 0-6: 7 ones + } + + for _, c := range allBits { + actual := all(c.r) // all() adds the starBit, so compensate for that.. + if c.expected|starBit != actual { + t.Errorf("%d-%d/%d => expected %b, got %b", + c.r.min, c.r.max, 1, c.expected|starBit, actual) + } + } +} + +func TestBits(t *testing.T) { + bits := []struct { + min, max, step uint + expected uint64 + }{ + {0, 0, 1, 0x1}, + {1, 1, 1, 0x2}, + {1, 5, 2, 0x2a}, // 101010 + {1, 4, 2, 0xa}, // 1010 + } + + for _, c := range bits { + actual := getBits(c.min, c.max, c.step) + if c.expected != actual { + t.Errorf("%d-%d/%d => expected %b, got %b", + c.min, c.max, c.step, c.expected, actual) + } + } +} + +func TestParseScheduleErrors(t *testing.T) { + var tests = []struct{ expr, err string }{ + {"* 5 j * * *", "failed to parse int from"}, + {"@every Xm", "failed to parse duration"}, + {"@unrecognized", "unrecognized descriptor"}, + {"* * * *", "expected 5 to 6 fields"}, + {"", "empty spec string"}, + } + for _, c := range tests { + actual, err := secondParser.Parse(c.expr) + if err == nil || !strings.Contains(err.Error(), c.err) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if actual != nil { + t.Errorf("expected nil schedule on error, got %v", actual) + } + } +} + +func TestParseSchedule(t *testing.T) { + tokyo, _ := time.LoadLocation("Asia/Tokyo") + entries := []struct { + parser Parser + expr string + expected Schedule + }{ + {secondParser, "0 5 * * * *", every5min(time.Local)}, + {standardParser, "5 * * * *", every5min(time.Local)}, + {secondParser, "CRON_TZ=UTC 0 5 * * * *", every5min(time.UTC)}, + {standardParser, "CRON_TZ=UTC 5 * * * *", every5min(time.UTC)}, + {secondParser, "CRON_TZ=Asia/Tokyo 0 5 * * * *", every5min(tokyo)}, + {secondParser, "@every 5m", ConstantDelaySchedule{5 * time.Minute}}, + {secondParser, "@midnight", midnight(time.Local)}, + {secondParser, "TZ=UTC @midnight", midnight(time.UTC)}, + {secondParser, "TZ=Asia/Tokyo @midnight", midnight(tokyo)}, + {secondParser, "@yearly", annual(time.Local)}, + {secondParser, "@annually", annual(time.Local)}, + { + parser: secondParser, + expr: "* 5 * * * *", + expected: &SpecSchedule{ + Second: all(seconds), + Minute: 1 << 5, + Hour: all(hours), + Dom: all(dom), + Month: all(months), + Dow: all(dow), + Location: time.Local, + }, + }, + } + + for _, c := range entries { + actual, err := c.parser.Parse(c.expr) + if err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestOptionalSecondSchedule(t *testing.T) { + parser := NewParser(SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor) + entries := []struct { + expr string + expected Schedule + }{ + {"0 5 * * * *", every5min(time.Local)}, + {"5 5 * * * *", every5min5s(time.Local)}, + {"5 * * * *", every5min(time.Local)}, + } + + for _, c := range entries { + actual, err := parser.Parse(c.expr) + if err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestNormalizeFields(t *testing.T) { + tests := []struct { + name string + input []string + options ParseOption + expected []string + }{ + { + "AllFields_NoOptional", + []string{"0", "5", "*", "*", "*", "*"}, + Second | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "AllFields_SecondOptional_Provided", + []string{"0", "5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "AllFields_SecondOptional_NotProvided", + []string{"5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | Dow | Descriptor, + []string{"0", "5", "*", "*", "*", "*"}, + }, + { + "SubsetFields_NoOptional", + []string{"5", "15", "*"}, + Hour | Dom | Month, + []string{"0", "0", "5", "15", "*", "*"}, + }, + { + "SubsetFields_DowOptional_Provided", + []string{"5", "15", "*", "4"}, + Hour | Dom | Month | DowOptional, + []string{"0", "0", "5", "15", "*", "4"}, + }, + { + "SubsetFields_DowOptional_NotProvided", + []string{"5", "15", "*"}, + Hour | Dom | Month | DowOptional, + []string{"0", "0", "5", "15", "*", "*"}, + }, + { + "SubsetFields_SecondOptional_NotProvided", + []string{"5", "15", "*"}, + SecondOptional | Hour | Dom | Month, + []string{"0", "0", "5", "15", "*", "*"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual, err := normalizeFields(test.input, test.options) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !reflect.DeepEqual(actual, test.expected) { + t.Errorf("expected %v, got %v", test.expected, actual) + } + }) + } +} + +func TestNormalizeFields_Errors(t *testing.T) { + tests := []struct { + name string + input []string + options ParseOption + err string + }{ + { + "TwoOptionals", + []string{"0", "5", "*", "*", "*", "*"}, + SecondOptional | Minute | Hour | Dom | Month | DowOptional, + "", + }, + { + "TooManyFields", + []string{"0", "5", "*", "*"}, + SecondOptional | Minute | Hour, + "", + }, + { + "NoFields", + []string{}, + SecondOptional | Minute | Hour, + "", + }, + { + "TooFewFields", + []string{"*"}, + SecondOptional | Minute | Hour, + "", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actual, err := normalizeFields(test.input, test.options) + if err == nil { + t.Errorf("expected an error, got none. results: %v", actual) + } + if !strings.Contains(err.Error(), test.err) { + t.Errorf("expected error %q, got %q", test.err, err.Error()) + } + }) + } +} + +func TestStandardSpecSchedule(t *testing.T) { + entries := []struct { + expr string + expected Schedule + err string + }{ + { + expr: "5 * * * *", + expected: &SpecSchedule{1 << seconds.min, 1 << 5, all(hours), all(dom), all(months), all(dow), time.Local}, + }, + { + expr: "@every 5m", + expected: ConstantDelaySchedule{time.Duration(5) * time.Minute}, + }, + { + expr: "5 j * * *", + err: "failed to parse int from", + }, + { + expr: "* * * *", + err: "expected 5 to 6 fields", + }, + } + + for _, c := range entries { + actual, err := ParseStandard(c.expr) + if len(c.err) != 0 && (err == nil || !strings.Contains(err.Error(), c.err)) { + t.Errorf("%s => expected %v, got %v", c.expr, c.err, err) + } + if len(c.err) == 0 && err != nil { + t.Errorf("%s => unexpected error %v", c.expr, err) + } + if !reflect.DeepEqual(actual, c.expected) { + t.Errorf("%s => expected %b, got %b", c.expr, c.expected, actual) + } + } +} + +func TestNoDescriptorParser(t *testing.T) { + parser := NewParser(Minute | Hour) + _, err := parser.Parse("@every 1m") + if err == nil { + t.Error("expected an error, got none") + } +} + +func every5min(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1 << 0, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +} + +func every5min5s(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1 << 5, 1 << 5, all(hours), all(dom), all(months), all(dow), loc} +} + +func midnight(loc *time.Location) *SpecSchedule { + return &SpecSchedule{1, 1, 1, all(dom), all(months), all(dow), loc} +} + +func annual(loc *time.Location) *SpecSchedule { + return &SpecSchedule{ + Second: 1 << seconds.min, + Minute: 1 << minutes.min, + Hour: 1 << hours.min, + Dom: 1 << dom.min, + Month: 1 << months.min, + Dow: all(dow), + Location: loc, + } +}