test(utils): 添加事件驱动模型测试
- 在 sqrt_test.go 中添加了 fastSqr1 测试函数,用于测试事件驱动模型 - 新增了 Event 和 Uint32AsyncEvent 类型用于测试 - 更新了 go.work、go.mod 和
This commit is contained in:
15
common/utils/event/.gitignore
vendored
Normal file
15
common/utils/event/.gitignore
vendored
Normal file
@@ -0,0 +1,15 @@
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
*.exe~
|
||||
*.dll
|
||||
*.so
|
||||
*.dylib
|
||||
|
||||
# Test binary, built with `go test -c`
|
||||
*.test
|
||||
|
||||
# Output of the go coverage tool, specifically when used with LiteIDE
|
||||
*.out
|
||||
|
||||
# Dependency directories (remove the comment below to include it)
|
||||
# vendor/
|
||||
21
common/utils/event/LICENSE
Normal file
21
common/utils/event/LICENSE
Normal file
@@ -0,0 +1,21 @@
|
||||
MIT License
|
||||
|
||||
Copyright (c) 2023 Bogdan Gabriel Dinu aka Badu
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
||||
232
common/utils/event/README.md
Normal file
232
common/utils/event/README.md
Normal file
@@ -0,0 +1,232 @@
|
||||
# Bus
|
||||
|
||||
- **Independent**: has no external dependencies
|
||||
- **Probably Fast**: no reflection
|
||||
- **Type Safe**: built on generics
|
||||
- **Small and Simple**: can be used as following:
|
||||
|
||||
Having the following event:
|
||||
|
||||
```go
|
||||
package events
|
||||
|
||||
type InterestingEvent struct {
|
||||
}
|
||||
```
|
||||
|
||||
a listener can registering a handler by calling `Sub` method:
|
||||
|
||||
```go
|
||||
package subscriber
|
||||
|
||||
import "github.com/badu/bus"
|
||||
|
||||
// ... somewhere in a setup function / constructor
|
||||
bus.Sub(OnMyEventOccurred)
|
||||
```
|
||||
|
||||
where the handler is having the following signature:
|
||||
|
||||
```go
|
||||
func OnMyEventOccurred(event InterestingEvent){
|
||||
// do something with the event here
|
||||
}
|
||||
```
|
||||
|
||||
The event producer / dispatcher will simply:
|
||||
|
||||
```go
|
||||
package dispatcher
|
||||
|
||||
import "github.com/badu/bus"
|
||||
|
||||
// ...somewhere in a dispatching function
|
||||
|
||||
bus.Pub(InterestingEvent{})
|
||||
```
|
||||
|
||||
If the event needs to go async, in the sense that the bus package will spin up a goroutine for the caller, just
|
||||
implement the following interface:
|
||||
|
||||
```go
|
||||
package events
|
||||
|
||||
func (e InterestingEvent) Async() bool{ return true }
|
||||
```
|
||||
|
||||
if the handler has the signature declared above, or
|
||||
|
||||
```go
|
||||
package events
|
||||
|
||||
func (e *InterestingEvent) Async() bool{ return true }
|
||||
```
|
||||
|
||||
if the handler has the signature as following:
|
||||
|
||||
```go
|
||||
func OnMyEventOccurred(event *InterestingEvent){
|
||||
// do something with the event here
|
||||
}
|
||||
```
|
||||
|
||||
Another way to publish an event async, is to use `PubAsync` method that package exposes.
|
||||
|
||||
By default, the bus is using sync events, which means that it waits for listeners to complete their jobs before calling
|
||||
the next listener.
|
||||
|
||||
Usage : `go get github.com/badu/bus`
|
||||
|
||||
## F.A.Q.
|
||||
|
||||
1. I want to cancel subscription at some point. How do I do that?
|
||||
|
||||
Subscribing returns access to the `Cancel` method
|
||||
|
||||
```go
|
||||
package subscriber
|
||||
|
||||
// ... somewhere in a setup function / constructor
|
||||
subscription := bus.Sub(OnMyEventOccurred)
|
||||
// when needed, calling cancel of subscription, so function OnMyEventOccurred won't be called anymore
|
||||
subscription.Cancel()
|
||||
```
|
||||
|
||||
2. Can I subscribe once?
|
||||
|
||||
Yes! The event handler has to return true.
|
||||
|
||||
```go
|
||||
package subscriber
|
||||
// ... somewhere in a setup function / constructor
|
||||
|
||||
bus.SubCancel( func(event InterestingEvent) bool {
|
||||
// do something with the event here
|
||||
return true // returning true will cancel the subscription
|
||||
})
|
||||
```
|
||||
|
||||
3. I want to inspect registered events. How do I do that?
|
||||
|
||||
The events mapper is a `sync.Map`, so iterate using `Range`
|
||||
|
||||
```go
|
||||
bus.Range(func(k, v any)bool{
|
||||
fmt.Printf("%#v %#v\n", k, v)
|
||||
})
|
||||
```
|
||||
|
||||
4. I want to use my own event names. Is that possible?
|
||||
|
||||
Yes! You have to implement the following interface:
|
||||
|
||||
```go
|
||||
package events
|
||||
|
||||
func (e InterestingEvent) EventID() string{
|
||||
return "YourInterestingEventName"
|
||||
}
|
||||
```
|
||||
|
||||
The event name is the key of the mapper, which means that implementing your own event names might cause panics
|
||||
if you have name collisions.
|
||||
|
||||
5. Will I have race conditions?
|
||||
|
||||
No. The package is concurrent safe.
|
||||
|
||||
## What Problem Does It Solve?
|
||||
|
||||
Decoupling of components: publishers and subscribers can operate independently of each other, with no direct knowledge
|
||||
of each other's existence. This decoupling allows for greater flexibility and scalability, as new publishers and
|
||||
subscribers can be added to the system without disrupting existing components. Also, this facilitates testing by
|
||||
triggering or ignoring certain events in some scenarios.
|
||||
|
||||
Asynchronous messaging: messages can be sent and received asynchronously (by spinning up goroutines), which means that
|
||||
publishers and subscribers don't have to wait for each other to consume their messages. This can improve performance and
|
||||
response times in a system.
|
||||
|
||||
Reliability: the message broker acts as a buffer between publishers and subscribers, ensuring that messages are
|
||||
delivered even if one or more components in the system are temporarily unavailable.
|
||||
|
||||
Modularity: the Pub-Sub pattern can be used to break a monolithic application into smaller, more modular components.
|
||||
Each component can then be developed and tested independently, making the overall system easier to maintain and update.
|
||||
|
||||
## Scenarios of Usage
|
||||
|
||||
Inside the `test_scenarios` folder, you can find the following scenarios:
|
||||
|
||||
1. Fire and Forget.
|
||||
|
||||
Imagine a system / application where we have three services : `users`, `notifications` (email and
|
||||
SMS) and `audit`. When a user registers, we want to send welcoming messages via SMS and email, but we also want to
|
||||
audit that registration for reporting purposes.
|
||||
|
||||
The [UserRegisteredEvent](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/events/main.go#L3)
|
||||
will carry the freshly registered username (which is also the email) and phone to the email and sms services. The
|
||||
event is [triggered](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/users/service.go#L23) by
|
||||
the user service, which performs the creation of the user account. We're using the `fire and forget` technique here,
|
||||
because the operation of registration should not depend on the fact that we've been able to
|
||||
send a welcoming email or a sms, or the audit system malfunctions.
|
||||
|
||||
Simulating audit service malfunctions easy.
|
||||
Instead of using `Sub`, we're using `SubUnsub` to register the listener
|
||||
and return [`true`](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/audit/service.go#L36) to
|
||||
unsubscribe on events of that kind.
|
||||
|
||||
2. Factory Request Reply
|
||||
|
||||
Imagine a system / application where we need to communicate with different microservices, but in this case we don't
|
||||
want to bring them online, we're just wanting to stub the response as those services were alive.
|
||||
|
||||
This technique is useful when we need to test some complicated flows of business logic and facilitates the
|
||||
transformation of an integration test into a classic unit test.
|
||||
|
||||
The `cart` service requires two replies from two other microservices `inventory` and `prices`. In the past, I've been
|
||||
using a closure function to provide the service with both real GRPC clients or with mocks and stubs. The service
|
||||
signature gets complicated and large as one service would depend on a lot of GRPC clients to aggregate data.
|
||||
|
||||
As you can see
|
||||
the [test here](https://github.com/badu/bus/blob/master/test_scenarios/factory-request-reply/main_test.go) it's much
|
||||
more elegant and the service constructor is much slimmer.
|
||||
|
||||
Events are one sync and one async, just to check it works in both scenarios.
|
||||
|
||||
Important to note that because a `WaitGroup` is being used in our event struct, we're forced to pass the events by
|
||||
using a pointer, instead of passing them by value.
|
||||
|
||||
3. Request Reply with Callback
|
||||
|
||||
In this example, we wanted to achieve two things. First is that the `service` and the `repository` are decoupled by
|
||||
events. More than that, we wanted that the events are generic on their own.
|
||||
|
||||
The `orders` service will dispatch a generic request event, one for placing an order, which will carry an `Order` (
|
||||
model) struct with that request and another `OrderStatus` (model) struct using the same generic event.
|
||||
|
||||
We are using a channel inside the generic `RequestEvent` to signal the `reply` to the publisher, which in this case
|
||||
is a callback function that returns the result as if the publisher would have called directly the listener.
|
||||
|
||||
I am sure that you will find this technique interesting and having a large number of applications.
|
||||
|
||||
4. Request Reply with Cancellation
|
||||
|
||||
Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain.
|
||||
The `repository` is simulating a long database call, longer than the context's cancellation, so the service gets the
|
||||
deadline exceeded error.
|
||||
|
||||
Note that this final example is not using a pointer to the event struct, but it contains two properties which have
|
||||
pointers, so the `service` can access the altered `reply`.
|
||||
|
||||
## Recommendations
|
||||
|
||||
1. always place your events inside a separate `events` package, avoiding circular dependencies.
|
||||
2. in general, in `request-reply` scenarios, the events should be passed as pointers (even if it's somewhat slower),
|
||||
because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup`
|
||||
inside your event struct, always use method receivers and pass the event as a pointer — otherwise you will be passing
|
||||
a lock by value (which is `sync.Locker`).
|
||||
3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the
|
||||
dispatcher. You should still have at least one property of that event that is a pointer (see events
|
||||
in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be
|
||||
passed around with an event that is being sent by value, not by pointer.
|
||||
4. you can override the event name (which is by default, built using `fmt.Sprintf("%T", yourEvent)`) you need to
|
||||
implement `EventID() string` interface.
|
||||
271
common/utils/event/bench_test.go
Normal file
271
common/utils/event/bench_test.go
Normal file
@@ -0,0 +1,271 @@
|
||||
package bus_test
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/badu/bus"
|
||||
)
|
||||
|
||||
type Uint32SyncEvent struct {
|
||||
u uint32
|
||||
}
|
||||
|
||||
type Uint32AsyncEvent struct {
|
||||
u uint32
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0008Sync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32SyncEvent]()
|
||||
c := uint32(0)
|
||||
for i := 0; i < 8; i++ {
|
||||
topic.Sub(func(v Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0008Async(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32AsyncEvent]()
|
||||
c := uint32(0)
|
||||
for i := 0; i < 8; i++ {
|
||||
topic.Sub(func(v Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0008PtrSync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32SyncEvent]()
|
||||
c := uint32(0)
|
||||
for i := 0; i < 8; i++ {
|
||||
topic.Sub(func(v *Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(&Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0008PtrAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
c := uint32(0)
|
||||
for i := 0; i < 8; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0256Sync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 256; i++ {
|
||||
topic.Sub(func(v Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0256Async(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 256; i++ {
|
||||
topic.Sub(func(v Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0256PtrSync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 256; i++ {
|
||||
topic.Sub(func(v *Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(&Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_0256PtrAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 256; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_1kSync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 1024; i++ {
|
||||
topic.Sub(func(v Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_1kAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 1024; i++ {
|
||||
topic.Sub(func(v Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_1kPtrSync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 1024; i++ {
|
||||
topic.Sub(func(v *Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(&Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_1kPtrAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 1024; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
func BenchmarkBroadcast_2kSync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 2048; i++ {
|
||||
topic.Sub(func(v Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_2kAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 2048; i++ {
|
||||
topic.Sub(func(v Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_2kPtrSync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32SyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 2048; i++ {
|
||||
topic.Sub(func(v *Uint32SyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.Pub(&Uint32SyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkBroadcast_2kPtrAsync(b *testing.B) {
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
var c uint32
|
||||
for i := 0; i < 2048; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, v.u)
|
||||
})
|
||||
}
|
||||
|
||||
b.RunParallel(func(p *testing.PB) {
|
||||
for p.Next() {
|
||||
topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
}
|
||||
})
|
||||
}
|
||||
3
common/utils/event/go.mod
Normal file
3
common/utils/event/go.mod
Normal file
@@ -0,0 +1,3 @@
|
||||
module github.com/badu/bus
|
||||
|
||||
go 1.20
|
||||
247
common/utils/event/main.go
Normal file
247
common/utils/event/main.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package bus
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
var mapper sync.Map // holds key (event name - string) versus topic values
|
||||
|
||||
// we allow developers to override event names. They should be careful about name collisions
|
||||
type iEventName interface {
|
||||
EventID() string //
|
||||
}
|
||||
|
||||
// if developers implement this interface, we're spinning a goroutine if the event says it is async
|
||||
type iAsync interface {
|
||||
Async() bool
|
||||
}
|
||||
|
||||
// Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic
|
||||
type Listener[T any] struct {
|
||||
parent *Topic[T] // so we can call unsubscribe from parent
|
||||
callback func(event T) // the function that we're going to call
|
||||
}
|
||||
|
||||
// Topic keeps the subscribers of one topic
|
||||
type Topic[T any] struct {
|
||||
subs []*Listener[T] // list of listeners
|
||||
rwMu sync.RWMutex // guards subs
|
||||
lisnsPool sync.Pool // a pool of listeners
|
||||
}
|
||||
|
||||
// NewTopic creates a new topic for a specie of events
|
||||
func NewTopic[T any]() *Topic[T] {
|
||||
result := &Topic[T]{}
|
||||
result.lisnsPool.New = func() any {
|
||||
return &Listener[T]{
|
||||
parent: result,
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Sub adds a callback to be called when an event of that type is being published
|
||||
func (b *Topic[T]) Sub(callback func(v T)) *Listener[T] {
|
||||
result := b.lisnsPool.Get().(*Listener[T])
|
||||
result.callback = callback
|
||||
result.parent = b
|
||||
|
||||
b.rwMu.Lock()
|
||||
b.subs = append(b.subs, result)
|
||||
b.rwMu.Unlock()
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// cancel is private to the topic, but can be accessed via Listener
|
||||
func (b *Topic[T]) cancel(who *Listener[T]) {
|
||||
b.rwMu.Lock()
|
||||
for i := range b.subs {
|
||||
if b.subs[i] != who {
|
||||
continue
|
||||
}
|
||||
|
||||
b.subs[i] = b.subs[len(b.subs)-1]
|
||||
b.subs[len(b.subs)-1] = nil
|
||||
b.subs = b.subs[:len(b.subs)-1]
|
||||
break
|
||||
}
|
||||
b.rwMu.Unlock()
|
||||
|
||||
who.callback = nil
|
||||
b.lisnsPool.Put(who)
|
||||
}
|
||||
|
||||
// NumSubs in case you need to perform tests and check the number of subscribers of this particular topic
|
||||
func (b *Topic[T]) NumSubs() int {
|
||||
b.rwMu.RLock()
|
||||
result := len(b.subs)
|
||||
b.rwMu.RUnlock()
|
||||
return result
|
||||
}
|
||||
|
||||
// Cancel forgets the indicated callback
|
||||
func (s *Listener[T]) Cancel() {
|
||||
s.parent.cancel(s)
|
||||
}
|
||||
|
||||
// Topic gives access to the underlying topic
|
||||
func (s *Listener[T]) Topic() *Topic[T] {
|
||||
return s.parent
|
||||
}
|
||||
|
||||
// Pub allows you to publish an event in that topic
|
||||
func (b *Topic[T]) Pub(event T) {
|
||||
b.rwMu.RLock()
|
||||
|
||||
isAsync := false
|
||||
switch m := any(event).(type) {
|
||||
case iAsync:
|
||||
isAsync = m.Async()
|
||||
}
|
||||
|
||||
for sub := range b.subs {
|
||||
if isAsync {
|
||||
go b.subs[sub].callback(event)
|
||||
continue
|
||||
}
|
||||
|
||||
b.subs[sub].callback(event)
|
||||
}
|
||||
|
||||
b.rwMu.RUnlock()
|
||||
}
|
||||
|
||||
func (b *Topic[T]) PubAsync(event T) {
|
||||
b.rwMu.RLock()
|
||||
|
||||
for sub := range b.subs {
|
||||
go b.subs[sub].callback(event)
|
||||
}
|
||||
|
||||
b.rwMu.RUnlock()
|
||||
}
|
||||
|
||||
// Bus is being returned when you subscribe, so you can manually Cancel
|
||||
type Bus[T any] struct {
|
||||
listener *Listener[T]
|
||||
stop atomic.Uint32 // flag for unsubscribing after receiving one event
|
||||
}
|
||||
|
||||
// Cancel allows callers to manually unsubscribe, in case they don't want to use SubCancel
|
||||
func (o *Bus[T]) Cancel() {
|
||||
if o.stop.CompareAndSwap(0, 1) {
|
||||
go o.listener.Cancel()
|
||||
}
|
||||
}
|
||||
|
||||
// SubCancel can be used if you need to unsubscribe immediately after receiving an event, by making your function return true
|
||||
func SubCancel[T any](callback func(event T) bool) *Bus[T] {
|
||||
var (
|
||||
event T
|
||||
key string
|
||||
)
|
||||
|
||||
switch m := any(event).(type) {
|
||||
case iEventName:
|
||||
key = m.EventID()
|
||||
default:
|
||||
key = fmt.Sprintf("%T", event)
|
||||
}
|
||||
|
||||
topic, ok := mapper.Load(key)
|
||||
if !ok || topic == nil {
|
||||
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
|
||||
}
|
||||
|
||||
var result Bus[T]
|
||||
|
||||
result.listener = topic.(*Topic[T]).Sub(func(v T) {
|
||||
if result.stop.Load() == 1 {
|
||||
return
|
||||
}
|
||||
|
||||
shouldCancel := callback(v)
|
||||
if shouldCancel {
|
||||
result.Cancel()
|
||||
}
|
||||
|
||||
})
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
// Sub subscribes a callback function to listen for a specie of events
|
||||
func Sub[T any](callback func(event T)) *Bus[T] {
|
||||
var (
|
||||
event T
|
||||
key string
|
||||
)
|
||||
|
||||
switch m := any(event).(type) {
|
||||
case iEventName:
|
||||
key = m.EventID()
|
||||
default:
|
||||
key = fmt.Sprintf("%T", event)
|
||||
}
|
||||
|
||||
topic, ok := mapper.Load(key)
|
||||
if !ok || topic == nil {
|
||||
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
|
||||
}
|
||||
|
||||
var result Bus[T]
|
||||
|
||||
result.listener = topic.(*Topic[T]).Sub(func(v T) {
|
||||
if result.stop.Load() == 1 {
|
||||
return
|
||||
}
|
||||
callback(v)
|
||||
})
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
// Pub publishes an event which will be dispatched to all listeners
|
||||
func Pub[T any](event T) {
|
||||
var key string
|
||||
|
||||
switch m := any(event).(type) {
|
||||
case iEventName:
|
||||
key = m.EventID()
|
||||
default:
|
||||
key = fmt.Sprintf("%T", event)
|
||||
}
|
||||
|
||||
topic, ok := mapper.Load(key)
|
||||
if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic)
|
||||
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
|
||||
}
|
||||
|
||||
topic.(*Topic[T]).Pub(event)
|
||||
}
|
||||
|
||||
// PubAsync publishes an event which will be dispatched to all listeners
|
||||
func PubAsync[T any](event T) {
|
||||
var key string
|
||||
|
||||
switch m := any(event).(type) {
|
||||
case iEventName:
|
||||
key = m.EventID()
|
||||
default:
|
||||
key = fmt.Sprintf("%T", event)
|
||||
}
|
||||
|
||||
topic, ok := mapper.Load(key)
|
||||
if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic)
|
||||
topic, _ = mapper.LoadOrStore(key, NewTopic[T]())
|
||||
}
|
||||
topic.(*Topic[T]).PubAsync(event)
|
||||
}
|
||||
|
||||
// Range gives access to mapper Range
|
||||
func Range(f func(k, v any) bool) {
|
||||
mapper.Range(f)
|
||||
}
|
||||
168
common/utils/event/main_test.go
Normal file
168
common/utils/event/main_test.go
Normal file
@@ -0,0 +1,168 @@
|
||||
package bus_test
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
|
||||
"github.com/badu/bus"
|
||||
)
|
||||
|
||||
func TestSubTopicWhilePub(t *testing.T) {
|
||||
// scenario : we have a large number of subscribers.
|
||||
// we publish an event and while doing that,
|
||||
// we register another one on a different goroutine
|
||||
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
for i := 0; i < 4096; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {})
|
||||
}
|
||||
|
||||
finishPubWait := make(chan struct{})
|
||||
finishSubWait := make(chan struct{})
|
||||
start := make(chan struct{})
|
||||
|
||||
go func() {
|
||||
<-start
|
||||
topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
defer close(finishPubWait)
|
||||
}()
|
||||
|
||||
newSubCalled := false
|
||||
|
||||
go func() {
|
||||
<-start
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
newSubCalled = true
|
||||
})
|
||||
close(finishSubWait)
|
||||
}()
|
||||
|
||||
close(start) // start both goroutines
|
||||
|
||||
<-finishPubWait // wait for pub to finish
|
||||
|
||||
<-finishSubWait // wait for sub to finish
|
||||
|
||||
if newSubCalled {
|
||||
t.Fatal("new subscriber should not be called")
|
||||
}
|
||||
}
|
||||
|
||||
func TestReusePayloadPointerAsync(t *testing.T) {
|
||||
// if you reuse the payload, you can alter it's content, of course
|
||||
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
c := uint32(0)
|
||||
for i := 0; i < 4096; i++ {
|
||||
k := i
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
if v.u == 2048 {
|
||||
atomic.AddUint32(&c, 1)
|
||||
return
|
||||
}
|
||||
|
||||
if k == 2048 {
|
||||
v.u = 2048
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
finishPubWait := make(chan struct{})
|
||||
|
||||
payload := Uint32AsyncEvent{u: 1}
|
||||
|
||||
topic.Pub(&payload)
|
||||
|
||||
close(finishPubWait)
|
||||
|
||||
<-finishPubWait // wait for pub to finish
|
||||
|
||||
t.Logf("altered payload %d for %d listeners", payload.u, c)
|
||||
}
|
||||
|
||||
func TestAsyncBus(t *testing.T) {
|
||||
c := uint32(0)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(4096)
|
||||
bus.Sub(
|
||||
func(event Uint32AsyncEvent) {
|
||||
atomic.AddUint32(&c, 1)
|
||||
wg.Done()
|
||||
},
|
||||
)
|
||||
|
||||
go func() {
|
||||
for i := 0; i < 1024; i++ {
|
||||
bus.PubAsync(Uint32AsyncEvent{})
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < 1024; i++ {
|
||||
bus.PubAsync(Uint32AsyncEvent{})
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < 1024; i++ {
|
||||
bus.PubAsync(Uint32AsyncEvent{})
|
||||
}
|
||||
}()
|
||||
go func() {
|
||||
for i := 0; i < 1024; i++ {
|
||||
bus.PubAsync(Uint32AsyncEvent{})
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
if c != 4096 {
|
||||
t.Fatalf("error : counter should be 4096 but is %d", c)
|
||||
}
|
||||
|
||||
t.Logf("%d", c)
|
||||
}
|
||||
|
||||
func TestRange(t *testing.T) {
|
||||
|
||||
type Event1 struct{}
|
||||
type Event2 struct{}
|
||||
type Event3 struct{}
|
||||
type Event4 struct{}
|
||||
type Event5 struct{}
|
||||
|
||||
bus.Sub(func(e Event1) {})
|
||||
bus.Sub(func(e Event2) {})
|
||||
bus.Sub(func(e Event2) {})
|
||||
bus.Sub(func(e Event3) {})
|
||||
bus.Sub(func(e Event3) {})
|
||||
bus.Sub(func(e Event3) {})
|
||||
bus.Sub(func(e Event4) {})
|
||||
bus.Sub(func(e Event4) {})
|
||||
bus.Sub(func(e Event4) {})
|
||||
bus.Sub(func(e Event4) {})
|
||||
bus.Sub(func(e Event5) {})
|
||||
bus.Sub(func(e Event5) {})
|
||||
bus.Sub(func(e Event5) {})
|
||||
bus.Sub(func(e Event5) {})
|
||||
bus.Sub(func(e Event5) {})
|
||||
|
||||
seen := map[string]struct{}{
|
||||
"bus_test.Event2": {},
|
||||
"bus_test.Event3": {},
|
||||
"bus_test.Event1": {},
|
||||
"bus_test.Event5": {},
|
||||
"bus_test.Event4": {},
|
||||
}
|
||||
|
||||
bus.Range(func(k, _ any) bool {
|
||||
if _, has := seen[k.(string)]; has {
|
||||
delete(seen, k.(string))
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
if len(seen) > 0 {
|
||||
t.Fatalf("error : not all events were seen")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,47 @@
|
||||
package cart
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/events"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/inventory"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/prices"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
}
|
||||
|
||||
func NewService(sb *strings.Builder) ServiceImpl {
|
||||
result := ServiceImpl{sb: sb}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) AddProductToCart(ctx context.Context, productID string) error {
|
||||
inventoryClientRequest := events.NewInventoryGRPCClientRequestEvent()
|
||||
bus.Pub(inventoryClientRequest)
|
||||
inventoryClientRequest.WaitReply()
|
||||
|
||||
pricesClientRequest := events.NewPricesGRPCClientRequestEvent()
|
||||
bus.Pub(pricesClientRequest)
|
||||
pricesClientRequest.WaitReply()
|
||||
|
||||
defer inventoryClientRequest.Conn.Close() // close GRPC connection when done
|
||||
stockResponse, err := inventoryClientRequest.Client.GetStockForProduct(ctx, &inventory.ProductIDRequest{ID: productID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer pricesClientRequest.Conn.Close() // close GRPC connection when done
|
||||
priceResponse, err := pricesClientRequest.Client.GetPricesForProduct(ctx, &prices.ProductIDRequest{ID: productID})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.sb.WriteString(fmt.Sprintf("stock %0.2fpcs @ price %0.2f$\n", stockResponse.Stock, priceResponse.Price))
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,56 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/inventory"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/prices"
|
||||
)
|
||||
|
||||
type InventoryGRPCClientRequestEvent struct {
|
||||
wg sync.WaitGroup
|
||||
Conn Closer // should be *grpc.ClientConn, but we're avoiding the import
|
||||
Client inventory.ServiceClient
|
||||
}
|
||||
|
||||
func NewInventoryGRPCClientRequestEvent() *InventoryGRPCClientRequestEvent {
|
||||
result := InventoryGRPCClientRequestEvent{}
|
||||
result.wg.Add(1)
|
||||
return &result
|
||||
}
|
||||
|
||||
func (i *InventoryGRPCClientRequestEvent) Async() bool {
|
||||
return true // this one is async
|
||||
}
|
||||
|
||||
func (i *InventoryGRPCClientRequestEvent) WaitReply() {
|
||||
i.wg.Wait()
|
||||
}
|
||||
|
||||
func (i *InventoryGRPCClientRequestEvent) Reply() {
|
||||
i.wg.Done()
|
||||
}
|
||||
|
||||
type PricesGRPCClientRequestEvent struct {
|
||||
wg sync.WaitGroup
|
||||
Conn Closer // should be *grpc.ClientConn, but we're avoiding the import
|
||||
Client prices.ServiceClient
|
||||
}
|
||||
|
||||
func NewPricesGRPCClientRequestEvent() *PricesGRPCClientRequestEvent {
|
||||
result := PricesGRPCClientRequestEvent{}
|
||||
result.wg.Add(1)
|
||||
return &result
|
||||
}
|
||||
|
||||
func (p *PricesGRPCClientRequestEvent) WaitReply() {
|
||||
p.wg.Wait()
|
||||
}
|
||||
|
||||
func (p *PricesGRPCClientRequestEvent) Reply() {
|
||||
p.wg.Done()
|
||||
}
|
||||
|
||||
type Closer interface {
|
||||
Close() error
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package inventory
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type ServiceClient interface {
|
||||
GetStockForProduct(ctx context.Context, in *ProductIDRequest) (*ProductStockResponse, error) // , opts ...grpc.CallOption) (*ProductStockResponse, error)
|
||||
}
|
||||
|
||||
type ProductIDRequest struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
type ProductStockResponse struct {
|
||||
Stock float64
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package inventory
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
}
|
||||
|
||||
func NewService() ServiceImpl {
|
||||
result := ServiceImpl{}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) GetStockForProduct(ctx context.Context, productID string) {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,81 @@
|
||||
package factory_request_reply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/cart"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/events"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/inventory"
|
||||
"github.com/badu/bus/test_scenarios/factory-request-reply/prices"
|
||||
)
|
||||
|
||||
var sb strings.Builder
|
||||
|
||||
type pricesClientStub struct{}
|
||||
|
||||
func (s *pricesClientStub) GetPricesForProduct(ctx context.Context, in *prices.ProductIDRequest) (*prices.ProductPriceResponse, error) {
|
||||
return &prices.ProductPriceResponse{Price: 10.30}, nil
|
||||
}
|
||||
|
||||
type fakeCloser struct {
|
||||
}
|
||||
|
||||
func (f *fakeCloser) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func OnPricesGRPCClientStubRequest(e *events.PricesGRPCClientRequestEvent) {
|
||||
sb.WriteString("OnPricesGRPCClientStubRequest\n")
|
||||
e.Client = &pricesClientStub{}
|
||||
e.Conn = &fakeCloser{}
|
||||
<-time.After(300 * time.Millisecond)
|
||||
e.Reply()
|
||||
}
|
||||
|
||||
type inventoryClientStub struct{}
|
||||
|
||||
func (s *inventoryClientStub) GetStockForProduct(ctx context.Context, in *inventory.ProductIDRequest) (*inventory.ProductStockResponse, error) {
|
||||
return &inventory.ProductStockResponse{Stock: 200}, nil
|
||||
}
|
||||
|
||||
func OnInventoryGRPCClientStubRequest(e *events.InventoryGRPCClientRequestEvent) {
|
||||
sb.WriteString("OnInventoryGRPCClientStubRequest\n")
|
||||
e.Client = &inventoryClientStub{}
|
||||
e.Conn = &fakeCloser{}
|
||||
<-time.After(300 * time.Millisecond)
|
||||
e.Reply()
|
||||
}
|
||||
|
||||
func TestGRPCClientStub(t *testing.T) {
|
||||
|
||||
cartSvc := cart.NewService(&sb)
|
||||
|
||||
bus.Sub(OnInventoryGRPCClientStubRequest)
|
||||
bus.Sub(OnPricesGRPCClientStubRequest)
|
||||
|
||||
err := cartSvc.AddProductToCart(context.Background(), "1")
|
||||
if err != nil {
|
||||
t.Fatalf("error adding product to cart : %#v", err)
|
||||
}
|
||||
|
||||
err = cartSvc.AddProductToCart(context.Background(), "2")
|
||||
if err != nil {
|
||||
t.Fatalf("error adding product to cart : %#v", err)
|
||||
}
|
||||
|
||||
const expecting = "OnInventoryGRPCClientStubRequest\n" +
|
||||
"OnPricesGRPCClientStubRequest\n" +
|
||||
"stock 200.00pcs @ price 10.30$\n" +
|
||||
"OnInventoryGRPCClientStubRequest\n" +
|
||||
"OnPricesGRPCClientStubRequest\n" +
|
||||
"stock 200.00pcs @ price 10.30$\n"
|
||||
|
||||
got := sb.String()
|
||||
if got != expecting {
|
||||
t.Fatalf("expecting :\n%s but got : \n%s", expecting, got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,17 @@
|
||||
package prices
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type ServiceClient interface {
|
||||
GetPricesForProduct(ctx context.Context, in *ProductIDRequest) (*ProductPriceResponse, error) // , opts ...grpc.CallOption) (*ProductPriceResponse, error)
|
||||
}
|
||||
|
||||
type ProductIDRequest struct {
|
||||
ID string
|
||||
}
|
||||
|
||||
type ProductPriceResponse struct {
|
||||
Price float64
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package prices
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
}
|
||||
|
||||
func NewService() ServiceImpl {
|
||||
result := ServiceImpl{}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) GetPricesForProduct(ctx context.Context, productID string) {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,37 @@
|
||||
package audit
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
}
|
||||
|
||||
func NewAuditService(sb *strings.Builder) ServiceImpl {
|
||||
result := ServiceImpl{sb: sb}
|
||||
bus.Sub(result.OnUserRegisteredEvent)
|
||||
bus.SubCancel(result.OnSMSRequestEvent)
|
||||
bus.SubCancel(result.OnSMSSentEvent)
|
||||
return result
|
||||
}
|
||||
|
||||
// OnUserRegisteredEvent is classic event handler
|
||||
func (s *ServiceImpl) OnUserRegisteredEvent(event events.UserRegisteredEvent) {
|
||||
// we can save audit data here
|
||||
}
|
||||
|
||||
// OnSMSRequestEvent is a pub-unsub type, we have to return 'false' to continue listening for this kind of events
|
||||
func (s *ServiceImpl) OnSMSRequestEvent(event events.SMSRequestEvent) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// OnSMSSentEvent is a pub-unsub type where we give up on listening after receiving first message
|
||||
func (s *ServiceImpl) OnSMSSentEvent(event events.SMSSentEvent) bool {
|
||||
s.sb.WriteString(fmt.Sprintf("audit event : an sms was %s sent to %s with message %s\n", event.Status, event.Request.Number, event.Request.Message))
|
||||
return true // after first event, audit will give up listening for events
|
||||
}
|
||||
@@ -0,0 +1,36 @@
|
||||
package events
|
||||
|
||||
type UserRegisteredEvent struct {
|
||||
UserName string
|
||||
Phone string
|
||||
}
|
||||
|
||||
func (e UserRegisteredEvent) Async() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type SMSRequestEvent struct {
|
||||
Number string
|
||||
Message string
|
||||
}
|
||||
|
||||
func (e SMSRequestEvent) Async() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type SMSSentEvent struct {
|
||||
Request SMSRequestEvent
|
||||
Status string
|
||||
}
|
||||
|
||||
func (e SMSSentEvent) Async() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type DummyEvent struct {
|
||||
AlteredAsync bool
|
||||
}
|
||||
|
||||
func (e *DummyEvent) Async() bool {
|
||||
return e.AlteredAsync
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
package fire_and_forget
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/audit"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/notifications"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/users"
|
||||
)
|
||||
|
||||
func OnDummyEvent(event *events.DummyEvent) {
|
||||
fmt.Println("dummy event async ?", event.Async())
|
||||
}
|
||||
|
||||
func TestUserRegistration(t *testing.T) {
|
||||
var sb strings.Builder
|
||||
|
||||
userSvc := users.NewService(&sb)
|
||||
notifications.NewSmsService(&sb)
|
||||
notifications.NewEmailService(&sb)
|
||||
audit.NewAuditService(&sb)
|
||||
|
||||
bus.Sub(OnDummyEvent)
|
||||
|
||||
userSvc.RegisterUser(context.Background(), "Badu", "+40742222222")
|
||||
|
||||
<-time.After(500 * time.Millisecond)
|
||||
|
||||
userSvc.RegisterUser(context.Background(), "Adina", "+40743333333")
|
||||
|
||||
<-time.After(500 * time.Millisecond)
|
||||
|
||||
const expecting = "user Badu has registered - sending welcome email message\n" +
|
||||
"sms sent requested for number +40742222222 with message Badu your user account was created. Check your email for instructions\n" +
|
||||
"audit event : an sms was successfully sent sent to +40742222222 with message Badu your user account was created. Check your email for instructions\n" +
|
||||
"user Adina has registered - sending welcome email message\n" +
|
||||
"sms sent requested for number +40743333333 with message Adina your user account was created. Check your email for instructions\n"
|
||||
|
||||
got := sb.String()
|
||||
if got != expecting {
|
||||
t.Fatalf("expecting :\n%s but got : \n%s", expecting, got)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
|
||||
)
|
||||
|
||||
type EmailServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
}
|
||||
|
||||
func NewEmailService(sb *strings.Builder) EmailServiceImpl {
|
||||
result := EmailServiceImpl{sb: sb}
|
||||
bus.Sub(result.OnUserRegisteredEvent)
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *EmailServiceImpl) OnUserRegisteredEvent(e events.UserRegisteredEvent) {
|
||||
s.sb.WriteString(fmt.Sprintf("user %s has registered - sending welcome email message\n", e.UserName))
|
||||
bus.Pub(events.SMSRequestEvent{
|
||||
Number: e.Phone,
|
||||
Message: fmt.Sprintf("%s your user account was created. Check your email for instructions", e.UserName),
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,27 @@
|
||||
package notifications
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
|
||||
)
|
||||
|
||||
type SmsServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
}
|
||||
|
||||
func NewSmsService(sb *strings.Builder) SmsServiceImpl {
|
||||
result := SmsServiceImpl{sb: sb}
|
||||
bus.Sub(result.OnSMSSendRequest)
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *SmsServiceImpl) OnSMSSendRequest(event events.SMSRequestEvent) {
|
||||
s.sb.WriteString(fmt.Sprintf("sms sent requested for number %s with message %s\n", event.Number, event.Message))
|
||||
bus.Pub(events.SMSSentEvent{
|
||||
Request: event,
|
||||
Status: "successfully sent",
|
||||
})
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package users
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/fire-and-forget/events"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
c int
|
||||
}
|
||||
|
||||
func NewService(sb *strings.Builder) ServiceImpl {
|
||||
result := ServiceImpl{sb: sb}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) RegisterUser(ctx context.Context, name, phone string) {
|
||||
s.c++
|
||||
bus.Pub(events.UserRegisteredEvent{UserName: name, Phone: phone})
|
||||
bus.Pub(&events.DummyEvent{AlteredAsync: s.c%2 == 0}) // nobody listens on this one
|
||||
}
|
||||
@@ -0,0 +1,18 @@
|
||||
package events
|
||||
|
||||
type RequestEvent[T any] struct {
|
||||
Payload T
|
||||
Callback func() (*T, error)
|
||||
Done chan struct{}
|
||||
}
|
||||
|
||||
func NewRequestEvent[T any](payload T) *RequestEvent[T] {
|
||||
return &RequestEvent[T]{
|
||||
Payload: payload,
|
||||
Done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (i *RequestEvent[T]) Async() bool {
|
||||
return true // this one is async
|
||||
}
|
||||
@@ -0,0 +1,59 @@
|
||||
package request_reply_callback
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/badu/bus/test_scenarios/request-reply-callback/orders"
|
||||
)
|
||||
|
||||
func TestRequestReplyCallback(t *testing.T) {
|
||||
var sb strings.Builder
|
||||
orders.NewRepository(&sb)
|
||||
svc := orders.NewService(&sb)
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
newOrder0, err := svc.RegisterOrder(ctx, []int{1, 2, 3})
|
||||
if err != nil {
|
||||
t.Fatalf("error creating order : %#v", err)
|
||||
}
|
||||
|
||||
t.Logf("new order #0 : %#v", newOrder0)
|
||||
|
||||
newOrder1, err := svc.RegisterOrder(ctx, []int{4, 5, 6})
|
||||
if err != nil {
|
||||
t.Fatalf("error creating order : %#v", err)
|
||||
}
|
||||
|
||||
t.Logf("new order #1 : %#v", newOrder1)
|
||||
newOrder2, err := svc.RegisterOrder(ctx, []int{7, 8, 9})
|
||||
if err != nil {
|
||||
t.Fatalf("error creating order : %#v", err)
|
||||
}
|
||||
|
||||
t.Logf("new order #2 : %#v", newOrder2)
|
||||
|
||||
stat0, err := svc.GetOrderStatus(ctx, newOrder0.OrderID)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting order status : %#v", err)
|
||||
}
|
||||
t.Logf("order #0 status : %s", stat0.Status)
|
||||
|
||||
stat1, err := svc.GetOrderStatus(ctx, newOrder1.OrderID)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting order status : %#v", err)
|
||||
}
|
||||
|
||||
t.Logf("order #1 status : %s", stat1.Status)
|
||||
|
||||
stat2, err := svc.GetOrderStatus(ctx, newOrder2.OrderID)
|
||||
if err != nil {
|
||||
t.Fatalf("error getting order status : %#v", err)
|
||||
}
|
||||
t.Logf("order #2 status : %s", stat2.Status)
|
||||
|
||||
t.Logf("%s", sb.String())
|
||||
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package orders
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/request-reply-callback/events"
|
||||
)
|
||||
|
||||
type Order struct {
|
||||
OrderID int
|
||||
ProductIDs []int
|
||||
}
|
||||
|
||||
type OrderStatus struct {
|
||||
OrderID int
|
||||
Status string
|
||||
}
|
||||
|
||||
type RepositoryImpl struct {
|
||||
sb *strings.Builder
|
||||
calls int
|
||||
}
|
||||
|
||||
func NewRepository(sb *strings.Builder) RepositoryImpl {
|
||||
result := RepositoryImpl{sb: sb}
|
||||
bus.Sub(result.onCreateOrder)
|
||||
bus.Sub(result.onGetOrderStatus)
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *RepositoryImpl) onCreateOrder(event *events.RequestEvent[Order]) {
|
||||
defer func() { r.calls++ }()
|
||||
|
||||
<-time.After(500 * time.Millisecond) // simulate heavy database call
|
||||
|
||||
event.Callback = func() (*Order, error) {
|
||||
return &Order{OrderID: r.calls, ProductIDs: event.Payload.ProductIDs}, nil
|
||||
}
|
||||
|
||||
close(event.Done)
|
||||
}
|
||||
|
||||
func (r *RepositoryImpl) onGetOrderStatus(event *events.RequestEvent[OrderStatus]) {
|
||||
<-time.After(300 * time.Millisecond) // simulate heavy database call
|
||||
|
||||
event.Callback = func() (*OrderStatus, error) {
|
||||
status := "in_progress"
|
||||
if event.Payload.OrderID == 3 {
|
||||
status = "cancelled"
|
||||
}
|
||||
return &OrderStatus{OrderID: event.Payload.OrderID, Status: status}, nil
|
||||
}
|
||||
|
||||
close(event.Done)
|
||||
}
|
||||
@@ -0,0 +1,35 @@
|
||||
package orders
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/request-reply-callback/events"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
sb *strings.Builder
|
||||
}
|
||||
|
||||
func NewService(sb *strings.Builder) ServiceImpl {
|
||||
result := ServiceImpl{sb: sb}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) RegisterOrder(ctx context.Context, productIDs []int) (*Order, error) {
|
||||
event := events.NewRequestEvent[Order](Order{ProductIDs: productIDs})
|
||||
s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event))
|
||||
bus.Pub(event)
|
||||
<-event.Done // wait for "reply"
|
||||
return event.Callback() // return the callback, which is containing the actual result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) GetOrderStatus(ctx context.Context, orderID int) (*OrderStatus, error) {
|
||||
event := events.NewRequestEvent[OrderStatus](OrderStatus{OrderID: orderID})
|
||||
s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event))
|
||||
bus.Pub(event)
|
||||
<-event.Done // wait for "reply"
|
||||
return event.Callback() // return the callback, which is containing the actual result
|
||||
}
|
||||
@@ -0,0 +1,33 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
|
||||
type EventState struct {
|
||||
Ctx context.Context
|
||||
Done chan struct{} `json:"-"`
|
||||
Error error
|
||||
}
|
||||
|
||||
func NewEventState(ctx context.Context) *EventState {
|
||||
return &EventState{
|
||||
Ctx: ctx,
|
||||
Done: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *EventState) Close() {
|
||||
s.Error = s.Ctx.Err()
|
||||
close(s.Done)
|
||||
}
|
||||
|
||||
type NewOrder struct {
|
||||
ID int
|
||||
}
|
||||
|
||||
type CreateOrderEvent struct {
|
||||
NewOrder *NewOrder
|
||||
ProductIDs []int
|
||||
State *EventState
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
package request_reply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/badu/bus/test_scenarios/request-reply-with-cancellation/orders"
|
||||
)
|
||||
|
||||
func TestRequestReplyWithCancellation(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*1)
|
||||
svc := orders.NewService()
|
||||
orders.NewRepository()
|
||||
|
||||
response, err := svc.CreateOrder(ctx, []int{1, 2, 3})
|
||||
switch err {
|
||||
default:
|
||||
t.Fatalf("error : it supposed to timeout, but it responded %#v and the error is %#v", response, err)
|
||||
case context.DeadlineExceeded:
|
||||
// what we were expecting
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
@@ -0,0 +1,41 @@
|
||||
package orders
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/request-reply-with-cancellation/events"
|
||||
)
|
||||
|
||||
type Order struct {
|
||||
ID int
|
||||
ProductIDs []int
|
||||
}
|
||||
|
||||
type RepositoryImpl struct {
|
||||
calls int
|
||||
}
|
||||
|
||||
func NewRepository() RepositoryImpl {
|
||||
result := RepositoryImpl{}
|
||||
bus.Sub(result.OnCreateOrder)
|
||||
return result
|
||||
}
|
||||
|
||||
func (r *RepositoryImpl) OnCreateOrder(event events.CreateOrderEvent) {
|
||||
defer func() {
|
||||
r.calls++
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-time.After(4 * time.Second):
|
||||
event.NewOrder = &events.NewOrder{ID: r.calls}
|
||||
event.State.Close()
|
||||
return
|
||||
case <-event.State.Ctx.Done():
|
||||
event.State.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
package orders
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/badu/bus"
|
||||
"github.com/badu/bus/test_scenarios/request-reply-with-cancellation/events"
|
||||
)
|
||||
|
||||
type ServiceImpl struct {
|
||||
}
|
||||
|
||||
func NewService() ServiceImpl {
|
||||
result := ServiceImpl{}
|
||||
return result
|
||||
}
|
||||
|
||||
func (s *ServiceImpl) CreateOrder(ctx context.Context, productIDs []int) (*Order, error) {
|
||||
event := events.CreateOrderEvent{State: events.NewEventState(ctx), ProductIDs: productIDs, NewOrder: &events.NewOrder{}}
|
||||
bus.Pub(event)
|
||||
<-event.State.Done
|
||||
|
||||
if event.NewOrder != nil && event.State.Error == nil {
|
||||
return &Order{ID: event.NewOrder.ID, ProductIDs: productIDs}, nil
|
||||
}
|
||||
|
||||
return nil, event.State.Error
|
||||
}
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
|
||||
"github.com/badu/bus"
|
||||
)
|
||||
|
||||
func Test_fastSqrt(t *testing.T) {
|
||||
@@ -89,3 +91,78 @@ func BenchmarkFastSqrtInt(b *testing.B) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func Test_fastSqr1(b *testing.T) {
|
||||
// scenario : we have a large number of subscribers.
|
||||
// we publish an event and while doing that,
|
||||
// we register another one on a different goroutine
|
||||
|
||||
topic := bus.NewTopic[*Uint32AsyncEvent]()
|
||||
for i := 0; i < 4096; i++ {
|
||||
topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
print(v.u)
|
||||
})
|
||||
}
|
||||
|
||||
topic.Pub(&Uint32AsyncEvent{u: 1})
|
||||
// finishPubWait := make(chan struct{})
|
||||
// finishSubWait := make(chan struct{})
|
||||
// start := make(chan struct{})
|
||||
|
||||
// go func() {
|
||||
// <-start
|
||||
// topic.PubAsync(&Uint32AsyncEvent{u: 1})
|
||||
// defer close(finishPubWait)
|
||||
// }()
|
||||
|
||||
// newSubCalled := false
|
||||
|
||||
// go func() {
|
||||
// <-start
|
||||
// topic.Sub(func(v *Uint32AsyncEvent) {
|
||||
// newSubCalled = true
|
||||
// })
|
||||
// close(finishSubWait)
|
||||
// }()
|
||||
|
||||
// close(start) // start both goroutines
|
||||
|
||||
// <-finishPubWait // wait for pub to finish
|
||||
|
||||
// <-finishSubWait // wait for sub to finish
|
||||
|
||||
// if newSubCalled {
|
||||
// print("new subscriber called")
|
||||
// }
|
||||
|
||||
}
|
||||
|
||||
// Various event types
|
||||
const EventA = 0x01
|
||||
|
||||
type Uint32AsyncEvent struct {
|
||||
u uint32
|
||||
}
|
||||
|
||||
// Event type for testing purposes
|
||||
type Event struct {
|
||||
Data string
|
||||
type1 uint32
|
||||
}
|
||||
|
||||
// Type returns the event type
|
||||
func (ev Event) Type() uint32 {
|
||||
return ev.type1
|
||||
}
|
||||
|
||||
// newEventA creates a new instance of an event
|
||||
func newEventA(data string) Event {
|
||||
return Event{Data: data, type1: EventA}
|
||||
}
|
||||
|
||||
// Various event types
|
||||
const EventB1 = 0x02
|
||||
|
||||
func newEventB(data string) Event {
|
||||
return Event{Data: data, type1: EventB1}
|
||||
}
|
||||
|
||||
1
go.work
1
go.work
@@ -8,6 +8,7 @@ use (
|
||||
./common/cool
|
||||
./common/utils/bitset
|
||||
./common/utils/bytearray
|
||||
./common/utils/event
|
||||
./common/utils/go-jsonrpc
|
||||
./common/utils/log
|
||||
./common/utils/sturc
|
||||
|
||||
@@ -11,6 +11,7 @@ require (
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/jackc/puddle/v2 v2.2.1 // indirect
|
||||
github.com/kelindar/event v1.5.2 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/panjf2000/ants/v2 v2.11.3 // indirect
|
||||
|
||||
@@ -47,6 +47,8 @@ github.com/jackc/pgx/v5 v5.5.3/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiw
|
||||
github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
|
||||
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
|
||||
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
|
||||
github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY=
|
||||
github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
|
||||
Reference in New Issue
Block a user