diff --git a/common/utils/event/.gitignore b/common/utils/event/.gitignore new file mode 100644 index 000000000..66fd13c90 --- /dev/null +++ b/common/utils/event/.gitignore @@ -0,0 +1,15 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Dependency directories (remove the comment below to include it) +# vendor/ diff --git a/common/utils/event/LICENSE b/common/utils/event/LICENSE new file mode 100644 index 000000000..f47b52e6b --- /dev/null +++ b/common/utils/event/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2023 Bogdan Gabriel Dinu aka Badu + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/common/utils/event/README.md b/common/utils/event/README.md new file mode 100644 index 000000000..0a0ca37e9 --- /dev/null +++ b/common/utils/event/README.md @@ -0,0 +1,232 @@ +# Bus + +- **Independent**: has no external dependencies +- **Probably Fast**: no reflection +- **Type Safe**: built on generics +- **Small and Simple**: can be used as following: + +Having the following event: + +```go + package events + + type InterestingEvent struct { + } +``` + +a listener can registering a handler by calling `Sub` method: + +```go + package subscriber + + import "github.com/badu/bus" + + // ... somewhere in a setup function / constructor + bus.Sub(OnMyEventOccurred) +``` + +where the handler is having the following signature: + +```go + func OnMyEventOccurred(event InterestingEvent){ + // do something with the event here + } +``` + +The event producer / dispatcher will simply: + +```go + package dispatcher + + import "github.com/badu/bus" + + // ...somewhere in a dispatching function + + bus.Pub(InterestingEvent{}) +``` + +If the event needs to go async, in the sense that the bus package will spin up a goroutine for the caller, just +implement the following interface: + +```go + package events + + func (e InterestingEvent) Async() bool{ return true } +``` + +if the handler has the signature declared above, or + +```go + package events + + func (e *InterestingEvent) Async() bool{ return true } +``` + +if the handler has the signature as following: + +```go + func OnMyEventOccurred(event *InterestingEvent){ + // do something with the event here + } +``` + +Another way to publish an event async, is to use `PubAsync` method that package exposes. + +By default, the bus is using sync events, which means that it waits for listeners to complete their jobs before calling +the next listener. + +Usage : `go get github.com/badu/bus` + +## F.A.Q. + +1. I want to cancel subscription at some point. How do I do that? + +Subscribing returns access to the `Cancel` method + +```go +package subscriber + +// ... somewhere in a setup function / constructor +subscription := bus.Sub(OnMyEventOccurred) +// when needed, calling cancel of subscription, so function OnMyEventOccurred won't be called anymore +subscription.Cancel() +``` + +2. Can I subscribe once? + +Yes! The event handler has to return true. + +```go +package subscriber +// ... somewhere in a setup function / constructor + +bus.SubCancel( func(event InterestingEvent) bool { + // do something with the event here + return true // returning true will cancel the subscription +}) +``` + +3. I want to inspect registered events. How do I do that? + +The events mapper is a `sync.Map`, so iterate using `Range` + +```go +bus.Range(func(k, v any)bool{ + fmt.Printf("%#v %#v\n", k, v) +}) +``` + +4. I want to use my own event names. Is that possible? + +Yes! You have to implement the following interface: + +```go +package events + +func (e InterestingEvent) EventID() string{ + return "YourInterestingEventName" +} +``` + +The event name is the key of the mapper, which means that implementing your own event names might cause panics +if you have name collisions. + +5. Will I have race conditions? + +No. The package is concurrent safe. + +## What Problem Does It Solve? + +Decoupling of components: publishers and subscribers can operate independently of each other, with no direct knowledge +of each other's existence. This decoupling allows for greater flexibility and scalability, as new publishers and +subscribers can be added to the system without disrupting existing components. Also, this facilitates testing by +triggering or ignoring certain events in some scenarios. + +Asynchronous messaging: messages can be sent and received asynchronously (by spinning up goroutines), which means that +publishers and subscribers don't have to wait for each other to consume their messages. This can improve performance and +response times in a system. + +Reliability: the message broker acts as a buffer between publishers and subscribers, ensuring that messages are +delivered even if one or more components in the system are temporarily unavailable. + +Modularity: the Pub-Sub pattern can be used to break a monolithic application into smaller, more modular components. +Each component can then be developed and tested independently, making the overall system easier to maintain and update. + +## Scenarios of Usage + +Inside the `test_scenarios` folder, you can find the following scenarios: + +1. Fire and Forget. + + Imagine a system / application where we have three services : `users`, `notifications` (email and + SMS) and `audit`. When a user registers, we want to send welcoming messages via SMS and email, but we also want to + audit that registration for reporting purposes. + + The [UserRegisteredEvent](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/events/main.go#L3) + will carry the freshly registered username (which is also the email) and phone to the email and sms services. The + event is [triggered](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/users/service.go#L23) by + the user service, which performs the creation of the user account. We're using the `fire and forget` technique here, + because the operation of registration should not depend on the fact that we've been able to + send a welcoming email or a sms, or the audit system malfunctions. + + Simulating audit service malfunctions easy. + Instead of using `Sub`, we're using `SubUnsub` to register the listener + and return [`true`](https://github.com/badu/bus/blob/master/test_scenarios/fire-and-forget/audit/service.go#L36) to + unsubscribe on events of that kind. + +2. Factory Request Reply + + Imagine a system / application where we need to communicate with different microservices, but in this case we don't + want to bring them online, we're just wanting to stub the response as those services were alive. + + This technique is useful when we need to test some complicated flows of business logic and facilitates the + transformation of an integration test into a classic unit test. + + The `cart` service requires two replies from two other microservices `inventory` and `prices`. In the past, I've been + using a closure function to provide the service with both real GRPC clients or with mocks and stubs. The service + signature gets complicated and large as one service would depend on a lot of GRPC clients to aggregate data. + + As you can see + the [test here](https://github.com/badu/bus/blob/master/test_scenarios/factory-request-reply/main_test.go) it's much + more elegant and the service constructor is much slimmer. + + Events are one sync and one async, just to check it works in both scenarios. + + Important to note that because a `WaitGroup` is being used in our event struct, we're forced to pass the events by + using a pointer, instead of passing them by value. + +3. Request Reply with Callback + + In this example, we wanted to achieve two things. First is that the `service` and the `repository` are decoupled by + events. More than that, we wanted that the events are generic on their own. + + The `orders` service will dispatch a generic request event, one for placing an order, which will carry an `Order` ( + model) struct with that request and another `OrderStatus` (model) struct using the same generic event. + + We are using a channel inside the generic `RequestEvent` to signal the `reply` to the publisher, which in this case + is a callback function that returns the result as if the publisher would have called directly the listener. + + I am sure that you will find this technique interesting and having a large number of applications. + +4. Request Reply with Cancellation + + Last but, not least, this is an example about providing `context.Context` along the publisher subscriber chain. + The `repository` is simulating a long database call, longer than the context's cancellation, so the service gets the + deadline exceeded error. + + Note that this final example is not using a pointer to the event struct, but it contains two properties which have + pointers, so the `service` can access the altered `reply`. + +## Recommendations + +1. always place your events inside a separate `events` package, avoiding circular dependencies. +2. in general, in `request-reply` scenarios, the events should be passed as pointers (even if it's somewhat slower), + because changing properties that represents the `reply` would not be reflected. Also, when using `sync.WaitGroup` + inside your event struct, always use method receivers and pass the event as a pointer — otherwise you will be passing + a lock by value (which is `sync.Locker`). +3. be careful if you don't want to use pointers for events, but you still need to pass values from the listener to the + dispatcher. You should still have at least one property of that event that is a pointer (see events + in `request reply with cancellation` for example). Same technique can be applied when you need `sync.Waitgroup` to be + passed around with an event that is being sent by value, not by pointer. +4. you can override the event name (which is by default, built using `fmt.Sprintf("%T", yourEvent)`) you need to + implement `EventID() string` interface. diff --git a/common/utils/event/bench_test.go b/common/utils/event/bench_test.go new file mode 100644 index 000000000..fb9dceb39 --- /dev/null +++ b/common/utils/event/bench_test.go @@ -0,0 +1,271 @@ +package bus_test + +import ( + "sync/atomic" + "testing" + + "github.com/badu/bus" +) + +type Uint32SyncEvent struct { + u uint32 +} + +type Uint32AsyncEvent struct { + u uint32 +} + +func BenchmarkBroadcast_0008Sync(b *testing.B) { + topic := bus.NewTopic[Uint32SyncEvent]() + c := uint32(0) + for i := 0; i < 8; i++ { + topic.Sub(func(v Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0008Async(b *testing.B) { + topic := bus.NewTopic[Uint32AsyncEvent]() + c := uint32(0) + for i := 0; i < 8; i++ { + topic.Sub(func(v Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0008PtrSync(b *testing.B) { + topic := bus.NewTopic[*Uint32SyncEvent]() + c := uint32(0) + for i := 0; i < 8; i++ { + topic.Sub(func(v *Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(&Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0008PtrAsync(b *testing.B) { + topic := bus.NewTopic[*Uint32AsyncEvent]() + c := uint32(0) + for i := 0; i < 8; i++ { + topic.Sub(func(v *Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(&Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0256Sync(b *testing.B) { + topic := bus.NewTopic[Uint32SyncEvent]() + var c uint32 + for i := 0; i < 256; i++ { + topic.Sub(func(v Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0256Async(b *testing.B) { + topic := bus.NewTopic[Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 256; i++ { + topic.Sub(func(v Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0256PtrSync(b *testing.B) { + topic := bus.NewTopic[*Uint32SyncEvent]() + var c uint32 + for i := 0; i < 256; i++ { + topic.Sub(func(v *Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(&Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_0256PtrAsync(b *testing.B) { + topic := bus.NewTopic[*Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 256; i++ { + topic.Sub(func(v *Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(&Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_1kSync(b *testing.B) { + topic := bus.NewTopic[Uint32SyncEvent]() + var c uint32 + for i := 0; i < 1024; i++ { + topic.Sub(func(v Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_1kAsync(b *testing.B) { + topic := bus.NewTopic[Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 1024; i++ { + topic.Sub(func(v Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_1kPtrSync(b *testing.B) { + topic := bus.NewTopic[*Uint32SyncEvent]() + var c uint32 + for i := 0; i < 1024; i++ { + topic.Sub(func(v *Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(&Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_1kPtrAsync(b *testing.B) { + topic := bus.NewTopic[*Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 1024; i++ { + topic.Sub(func(v *Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(&Uint32AsyncEvent{u: 1}) + } + }) +} +func BenchmarkBroadcast_2kSync(b *testing.B) { + topic := bus.NewTopic[Uint32SyncEvent]() + var c uint32 + for i := 0; i < 2048; i++ { + topic.Sub(func(v Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_2kAsync(b *testing.B) { + topic := bus.NewTopic[Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 2048; i++ { + topic.Sub(func(v Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(Uint32AsyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_2kPtrSync(b *testing.B) { + topic := bus.NewTopic[*Uint32SyncEvent]() + var c uint32 + for i := 0; i < 2048; i++ { + topic.Sub(func(v *Uint32SyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.Pub(&Uint32SyncEvent{u: 1}) + } + }) +} + +func BenchmarkBroadcast_2kPtrAsync(b *testing.B) { + topic := bus.NewTopic[*Uint32AsyncEvent]() + var c uint32 + for i := 0; i < 2048; i++ { + topic.Sub(func(v *Uint32AsyncEvent) { + atomic.AddUint32(&c, v.u) + }) + } + + b.RunParallel(func(p *testing.PB) { + for p.Next() { + topic.PubAsync(&Uint32AsyncEvent{u: 1}) + } + }) +} diff --git a/common/utils/event/go.mod b/common/utils/event/go.mod new file mode 100644 index 000000000..cdba3af83 --- /dev/null +++ b/common/utils/event/go.mod @@ -0,0 +1,3 @@ +module github.com/badu/bus + +go 1.20 diff --git a/common/utils/event/main.go b/common/utils/event/main.go new file mode 100644 index 000000000..3a6db21c3 --- /dev/null +++ b/common/utils/event/main.go @@ -0,0 +1,247 @@ +package bus + +import ( + "fmt" + "sync" + "sync/atomic" +) + +var mapper sync.Map // holds key (event name - string) versus topic values + +// we allow developers to override event names. They should be careful about name collisions +type iEventName interface { + EventID() string // +} + +// if developers implement this interface, we're spinning a goroutine if the event says it is async +type iAsync interface { + Async() bool +} + +// Listener is being returned when you subscribe to a topic, so you can unsubscribe or access the parent topic +type Listener[T any] struct { + parent *Topic[T] // so we can call unsubscribe from parent + callback func(event T) // the function that we're going to call +} + +// Topic keeps the subscribers of one topic +type Topic[T any] struct { + subs []*Listener[T] // list of listeners + rwMu sync.RWMutex // guards subs + lisnsPool sync.Pool // a pool of listeners +} + +// NewTopic creates a new topic for a specie of events +func NewTopic[T any]() *Topic[T] { + result := &Topic[T]{} + result.lisnsPool.New = func() any { + return &Listener[T]{ + parent: result, + } + } + return result +} + +// Sub adds a callback to be called when an event of that type is being published +func (b *Topic[T]) Sub(callback func(v T)) *Listener[T] { + result := b.lisnsPool.Get().(*Listener[T]) + result.callback = callback + result.parent = b + + b.rwMu.Lock() + b.subs = append(b.subs, result) + b.rwMu.Unlock() + + return result +} + +// cancel is private to the topic, but can be accessed via Listener +func (b *Topic[T]) cancel(who *Listener[T]) { + b.rwMu.Lock() + for i := range b.subs { + if b.subs[i] != who { + continue + } + + b.subs[i] = b.subs[len(b.subs)-1] + b.subs[len(b.subs)-1] = nil + b.subs = b.subs[:len(b.subs)-1] + break + } + b.rwMu.Unlock() + + who.callback = nil + b.lisnsPool.Put(who) +} + +// NumSubs in case you need to perform tests and check the number of subscribers of this particular topic +func (b *Topic[T]) NumSubs() int { + b.rwMu.RLock() + result := len(b.subs) + b.rwMu.RUnlock() + return result +} + +// Cancel forgets the indicated callback +func (s *Listener[T]) Cancel() { + s.parent.cancel(s) +} + +// Topic gives access to the underlying topic +func (s *Listener[T]) Topic() *Topic[T] { + return s.parent +} + +// Pub allows you to publish an event in that topic +func (b *Topic[T]) Pub(event T) { + b.rwMu.RLock() + + isAsync := false + switch m := any(event).(type) { + case iAsync: + isAsync = m.Async() + } + + for sub := range b.subs { + if isAsync { + go b.subs[sub].callback(event) + continue + } + + b.subs[sub].callback(event) + } + + b.rwMu.RUnlock() +} + +func (b *Topic[T]) PubAsync(event T) { + b.rwMu.RLock() + + for sub := range b.subs { + go b.subs[sub].callback(event) + } + + b.rwMu.RUnlock() +} + +// Bus is being returned when you subscribe, so you can manually Cancel +type Bus[T any] struct { + listener *Listener[T] + stop atomic.Uint32 // flag for unsubscribing after receiving one event +} + +// Cancel allows callers to manually unsubscribe, in case they don't want to use SubCancel +func (o *Bus[T]) Cancel() { + if o.stop.CompareAndSwap(0, 1) { + go o.listener.Cancel() + } +} + +// SubCancel can be used if you need to unsubscribe immediately after receiving an event, by making your function return true +func SubCancel[T any](callback func(event T) bool) *Bus[T] { + var ( + event T + key string + ) + + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) + if !ok || topic == nil { + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) + } + + var result Bus[T] + + result.listener = topic.(*Topic[T]).Sub(func(v T) { + if result.stop.Load() == 1 { + return + } + + shouldCancel := callback(v) + if shouldCancel { + result.Cancel() + } + + }) + + return &result +} + +// Sub subscribes a callback function to listen for a specie of events +func Sub[T any](callback func(event T)) *Bus[T] { + var ( + event T + key string + ) + + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) + if !ok || topic == nil { + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) + } + + var result Bus[T] + + result.listener = topic.(*Topic[T]).Sub(func(v T) { + if result.stop.Load() == 1 { + return + } + callback(v) + }) + + return &result +} + +// Pub publishes an event which will be dispatched to all listeners +func Pub[T any](event T) { + var key string + + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) + if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic) + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) + } + + topic.(*Topic[T]).Pub(event) +} + +// PubAsync publishes an event which will be dispatched to all listeners +func PubAsync[T any](event T) { + var key string + + switch m := any(event).(type) { + case iEventName: + key = m.EventID() + default: + key = fmt.Sprintf("%T", event) + } + + topic, ok := mapper.Load(key) + if !ok || topic == nil { // create a new topic, even if there are no listeners (otherwise we will have to panic) + topic, _ = mapper.LoadOrStore(key, NewTopic[T]()) + } + topic.(*Topic[T]).PubAsync(event) +} + +// Range gives access to mapper Range +func Range(f func(k, v any) bool) { + mapper.Range(f) +} diff --git a/common/utils/event/main_test.go b/common/utils/event/main_test.go new file mode 100644 index 000000000..61b6a88f0 --- /dev/null +++ b/common/utils/event/main_test.go @@ -0,0 +1,168 @@ +package bus_test + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/badu/bus" +) + +func TestSubTopicWhilePub(t *testing.T) { + // scenario : we have a large number of subscribers. + // we publish an event and while doing that, + // we register another one on a different goroutine + + topic := bus.NewTopic[*Uint32AsyncEvent]() + for i := 0; i < 4096; i++ { + topic.Sub(func(v *Uint32AsyncEvent) {}) + } + + finishPubWait := make(chan struct{}) + finishSubWait := make(chan struct{}) + start := make(chan struct{}) + + go func() { + <-start + topic.PubAsync(&Uint32AsyncEvent{u: 1}) + defer close(finishPubWait) + }() + + newSubCalled := false + + go func() { + <-start + topic.Sub(func(v *Uint32AsyncEvent) { + newSubCalled = true + }) + close(finishSubWait) + }() + + close(start) // start both goroutines + + <-finishPubWait // wait for pub to finish + + <-finishSubWait // wait for sub to finish + + if newSubCalled { + t.Fatal("new subscriber should not be called") + } +} + +func TestReusePayloadPointerAsync(t *testing.T) { + // if you reuse the payload, you can alter it's content, of course + + topic := bus.NewTopic[*Uint32AsyncEvent]() + c := uint32(0) + for i := 0; i < 4096; i++ { + k := i + topic.Sub(func(v *Uint32AsyncEvent) { + if v.u == 2048 { + atomic.AddUint32(&c, 1) + return + } + + if k == 2048 { + v.u = 2048 + } + }) + } + + finishPubWait := make(chan struct{}) + + payload := Uint32AsyncEvent{u: 1} + + topic.Pub(&payload) + + close(finishPubWait) + + <-finishPubWait // wait for pub to finish + + t.Logf("altered payload %d for %d listeners", payload.u, c) +} + +func TestAsyncBus(t *testing.T) { + c := uint32(0) + + var wg sync.WaitGroup + wg.Add(4096) + bus.Sub( + func(event Uint32AsyncEvent) { + atomic.AddUint32(&c, 1) + wg.Done() + }, + ) + + go func() { + for i := 0; i < 1024; i++ { + bus.PubAsync(Uint32AsyncEvent{}) + } + }() + go func() { + for i := 0; i < 1024; i++ { + bus.PubAsync(Uint32AsyncEvent{}) + } + }() + go func() { + for i := 0; i < 1024; i++ { + bus.PubAsync(Uint32AsyncEvent{}) + } + }() + go func() { + for i := 0; i < 1024; i++ { + bus.PubAsync(Uint32AsyncEvent{}) + } + }() + + wg.Wait() + + if c != 4096 { + t.Fatalf("error : counter should be 4096 but is %d", c) + } + + t.Logf("%d", c) +} + +func TestRange(t *testing.T) { + + type Event1 struct{} + type Event2 struct{} + type Event3 struct{} + type Event4 struct{} + type Event5 struct{} + + bus.Sub(func(e Event1) {}) + bus.Sub(func(e Event2) {}) + bus.Sub(func(e Event2) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event3) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event4) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + bus.Sub(func(e Event5) {}) + + seen := map[string]struct{}{ + "bus_test.Event2": {}, + "bus_test.Event3": {}, + "bus_test.Event1": {}, + "bus_test.Event5": {}, + "bus_test.Event4": {}, + } + + bus.Range(func(k, _ any) bool { + if _, has := seen[k.(string)]; has { + delete(seen, k.(string)) + } + return true + }) + + if len(seen) > 0 { + t.Fatalf("error : not all events were seen") + } +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/cart/service.go b/common/utils/event/test_scenarios/factory-request-reply/cart/service.go new file mode 100644 index 000000000..de152e5c3 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/cart/service.go @@ -0,0 +1,47 @@ +package cart + +import ( + "context" + "fmt" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/factory-request-reply/events" + "github.com/badu/bus/test_scenarios/factory-request-reply/inventory" + "github.com/badu/bus/test_scenarios/factory-request-reply/prices" +) + +type ServiceImpl struct { + sb *strings.Builder +} + +func NewService(sb *strings.Builder) ServiceImpl { + result := ServiceImpl{sb: sb} + return result +} + +func (s *ServiceImpl) AddProductToCart(ctx context.Context, productID string) error { + inventoryClientRequest := events.NewInventoryGRPCClientRequestEvent() + bus.Pub(inventoryClientRequest) + inventoryClientRequest.WaitReply() + + pricesClientRequest := events.NewPricesGRPCClientRequestEvent() + bus.Pub(pricesClientRequest) + pricesClientRequest.WaitReply() + + defer inventoryClientRequest.Conn.Close() // close GRPC connection when done + stockResponse, err := inventoryClientRequest.Client.GetStockForProduct(ctx, &inventory.ProductIDRequest{ID: productID}) + if err != nil { + return err + } + + defer pricesClientRequest.Conn.Close() // close GRPC connection when done + priceResponse, err := pricesClientRequest.Client.GetPricesForProduct(ctx, &prices.ProductIDRequest{ID: productID}) + if err != nil { + return err + } + + s.sb.WriteString(fmt.Sprintf("stock %0.2fpcs @ price %0.2f$\n", stockResponse.Stock, priceResponse.Price)) + + return nil +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/events/main.go b/common/utils/event/test_scenarios/factory-request-reply/events/main.go new file mode 100644 index 000000000..17a9c0a31 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/events/main.go @@ -0,0 +1,56 @@ +package events + +import ( + "sync" + + "github.com/badu/bus/test_scenarios/factory-request-reply/inventory" + "github.com/badu/bus/test_scenarios/factory-request-reply/prices" +) + +type InventoryGRPCClientRequestEvent struct { + wg sync.WaitGroup + Conn Closer // should be *grpc.ClientConn, but we're avoiding the import + Client inventory.ServiceClient +} + +func NewInventoryGRPCClientRequestEvent() *InventoryGRPCClientRequestEvent { + result := InventoryGRPCClientRequestEvent{} + result.wg.Add(1) + return &result +} + +func (i *InventoryGRPCClientRequestEvent) Async() bool { + return true // this one is async +} + +func (i *InventoryGRPCClientRequestEvent) WaitReply() { + i.wg.Wait() +} + +func (i *InventoryGRPCClientRequestEvent) Reply() { + i.wg.Done() +} + +type PricesGRPCClientRequestEvent struct { + wg sync.WaitGroup + Conn Closer // should be *grpc.ClientConn, but we're avoiding the import + Client prices.ServiceClient +} + +func NewPricesGRPCClientRequestEvent() *PricesGRPCClientRequestEvent { + result := PricesGRPCClientRequestEvent{} + result.wg.Add(1) + return &result +} + +func (p *PricesGRPCClientRequestEvent) WaitReply() { + p.wg.Wait() +} + +func (p *PricesGRPCClientRequestEvent) Reply() { + p.wg.Done() +} + +type Closer interface { + Close() error +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/inventory/grpc_client.go b/common/utils/event/test_scenarios/factory-request-reply/inventory/grpc_client.go new file mode 100644 index 000000000..e720cda73 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/inventory/grpc_client.go @@ -0,0 +1,17 @@ +package inventory + +import ( + "context" +) + +type ServiceClient interface { + GetStockForProduct(ctx context.Context, in *ProductIDRequest) (*ProductStockResponse, error) // , opts ...grpc.CallOption) (*ProductStockResponse, error) +} + +type ProductIDRequest struct { + ID string +} + +type ProductStockResponse struct { + Stock float64 +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/inventory/service.go b/common/utils/event/test_scenarios/factory-request-reply/inventory/service.go new file mode 100644 index 000000000..00c699984 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/inventory/service.go @@ -0,0 +1,18 @@ +package inventory + +import ( + "context" +) + +type ServiceImpl struct { +} + +func NewService() ServiceImpl { + result := ServiceImpl{} + + return result +} + +func (s *ServiceImpl) GetStockForProduct(ctx context.Context, productID string) { + +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/main_test.go b/common/utils/event/test_scenarios/factory-request-reply/main_test.go new file mode 100644 index 000000000..54488c86e --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/main_test.go @@ -0,0 +1,81 @@ +package factory_request_reply + +import ( + "context" + "strings" + "testing" + "time" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/factory-request-reply/cart" + "github.com/badu/bus/test_scenarios/factory-request-reply/events" + "github.com/badu/bus/test_scenarios/factory-request-reply/inventory" + "github.com/badu/bus/test_scenarios/factory-request-reply/prices" +) + +var sb strings.Builder + +type pricesClientStub struct{} + +func (s *pricesClientStub) GetPricesForProduct(ctx context.Context, in *prices.ProductIDRequest) (*prices.ProductPriceResponse, error) { + return &prices.ProductPriceResponse{Price: 10.30}, nil +} + +type fakeCloser struct { +} + +func (f *fakeCloser) Close() error { + return nil +} + +func OnPricesGRPCClientStubRequest(e *events.PricesGRPCClientRequestEvent) { + sb.WriteString("OnPricesGRPCClientStubRequest\n") + e.Client = &pricesClientStub{} + e.Conn = &fakeCloser{} + <-time.After(300 * time.Millisecond) + e.Reply() +} + +type inventoryClientStub struct{} + +func (s *inventoryClientStub) GetStockForProduct(ctx context.Context, in *inventory.ProductIDRequest) (*inventory.ProductStockResponse, error) { + return &inventory.ProductStockResponse{Stock: 200}, nil +} + +func OnInventoryGRPCClientStubRequest(e *events.InventoryGRPCClientRequestEvent) { + sb.WriteString("OnInventoryGRPCClientStubRequest\n") + e.Client = &inventoryClientStub{} + e.Conn = &fakeCloser{} + <-time.After(300 * time.Millisecond) + e.Reply() +} + +func TestGRPCClientStub(t *testing.T) { + + cartSvc := cart.NewService(&sb) + + bus.Sub(OnInventoryGRPCClientStubRequest) + bus.Sub(OnPricesGRPCClientStubRequest) + + err := cartSvc.AddProductToCart(context.Background(), "1") + if err != nil { + t.Fatalf("error adding product to cart : %#v", err) + } + + err = cartSvc.AddProductToCart(context.Background(), "2") + if err != nil { + t.Fatalf("error adding product to cart : %#v", err) + } + + const expecting = "OnInventoryGRPCClientStubRequest\n" + + "OnPricesGRPCClientStubRequest\n" + + "stock 200.00pcs @ price 10.30$\n" + + "OnInventoryGRPCClientStubRequest\n" + + "OnPricesGRPCClientStubRequest\n" + + "stock 200.00pcs @ price 10.30$\n" + + got := sb.String() + if got != expecting { + t.Fatalf("expecting :\n%s but got : \n%s", expecting, got) + } +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/prices/grpc_client.go b/common/utils/event/test_scenarios/factory-request-reply/prices/grpc_client.go new file mode 100644 index 000000000..27283bd41 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/prices/grpc_client.go @@ -0,0 +1,17 @@ +package prices + +import ( + "context" +) + +type ServiceClient interface { + GetPricesForProduct(ctx context.Context, in *ProductIDRequest) (*ProductPriceResponse, error) // , opts ...grpc.CallOption) (*ProductPriceResponse, error) +} + +type ProductIDRequest struct { + ID string +} + +type ProductPriceResponse struct { + Price float64 +} diff --git a/common/utils/event/test_scenarios/factory-request-reply/prices/service.go b/common/utils/event/test_scenarios/factory-request-reply/prices/service.go new file mode 100644 index 000000000..d66cc3ac0 --- /dev/null +++ b/common/utils/event/test_scenarios/factory-request-reply/prices/service.go @@ -0,0 +1,18 @@ +package prices + +import ( + "context" +) + +type ServiceImpl struct { +} + +func NewService() ServiceImpl { + result := ServiceImpl{} + + return result +} + +func (s *ServiceImpl) GetPricesForProduct(ctx context.Context, productID string) { + +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/audit/service.go b/common/utils/event/test_scenarios/fire-and-forget/audit/service.go new file mode 100644 index 000000000..a1a51e024 --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/audit/service.go @@ -0,0 +1,37 @@ +package audit + +import ( + "fmt" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" +) + +type ServiceImpl struct { + sb *strings.Builder +} + +func NewAuditService(sb *strings.Builder) ServiceImpl { + result := ServiceImpl{sb: sb} + bus.Sub(result.OnUserRegisteredEvent) + bus.SubCancel(result.OnSMSRequestEvent) + bus.SubCancel(result.OnSMSSentEvent) + return result +} + +// OnUserRegisteredEvent is classic event handler +func (s *ServiceImpl) OnUserRegisteredEvent(event events.UserRegisteredEvent) { + // we can save audit data here +} + +// OnSMSRequestEvent is a pub-unsub type, we have to return 'false' to continue listening for this kind of events +func (s *ServiceImpl) OnSMSRequestEvent(event events.SMSRequestEvent) bool { + return false +} + +// OnSMSSentEvent is a pub-unsub type where we give up on listening after receiving first message +func (s *ServiceImpl) OnSMSSentEvent(event events.SMSSentEvent) bool { + s.sb.WriteString(fmt.Sprintf("audit event : an sms was %s sent to %s with message %s\n", event.Status, event.Request.Number, event.Request.Message)) + return true // after first event, audit will give up listening for events +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/events/main.go b/common/utils/event/test_scenarios/fire-and-forget/events/main.go new file mode 100644 index 000000000..1c1c0fe64 --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/events/main.go @@ -0,0 +1,36 @@ +package events + +type UserRegisteredEvent struct { + UserName string + Phone string +} + +func (e UserRegisteredEvent) Async() bool { + return true +} + +type SMSRequestEvent struct { + Number string + Message string +} + +func (e SMSRequestEvent) Async() bool { + return true +} + +type SMSSentEvent struct { + Request SMSRequestEvent + Status string +} + +func (e SMSSentEvent) Async() bool { + return true +} + +type DummyEvent struct { + AlteredAsync bool +} + +func (e *DummyEvent) Async() bool { + return e.AlteredAsync +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/main_test.go b/common/utils/event/test_scenarios/fire-and-forget/main_test.go new file mode 100644 index 000000000..d54c72a18 --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/main_test.go @@ -0,0 +1,49 @@ +package fire_and_forget + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/fire-and-forget/audit" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" + "github.com/badu/bus/test_scenarios/fire-and-forget/notifications" + "github.com/badu/bus/test_scenarios/fire-and-forget/users" +) + +func OnDummyEvent(event *events.DummyEvent) { + fmt.Println("dummy event async ?", event.Async()) +} + +func TestUserRegistration(t *testing.T) { + var sb strings.Builder + + userSvc := users.NewService(&sb) + notifications.NewSmsService(&sb) + notifications.NewEmailService(&sb) + audit.NewAuditService(&sb) + + bus.Sub(OnDummyEvent) + + userSvc.RegisterUser(context.Background(), "Badu", "+40742222222") + + <-time.After(500 * time.Millisecond) + + userSvc.RegisterUser(context.Background(), "Adina", "+40743333333") + + <-time.After(500 * time.Millisecond) + + const expecting = "user Badu has registered - sending welcome email message\n" + + "sms sent requested for number +40742222222 with message Badu your user account was created. Check your email for instructions\n" + + "audit event : an sms was successfully sent sent to +40742222222 with message Badu your user account was created. Check your email for instructions\n" + + "user Adina has registered - sending welcome email message\n" + + "sms sent requested for number +40743333333 with message Adina your user account was created. Check your email for instructions\n" + + got := sb.String() + if got != expecting { + t.Fatalf("expecting :\n%s but got : \n%s", expecting, got) + } +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/notifications/email_service.go b/common/utils/event/test_scenarios/fire-and-forget/notifications/email_service.go new file mode 100644 index 000000000..6cfd4620f --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/notifications/email_service.go @@ -0,0 +1,27 @@ +package notifications + +import ( + "fmt" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" +) + +type EmailServiceImpl struct { + sb *strings.Builder +} + +func NewEmailService(sb *strings.Builder) EmailServiceImpl { + result := EmailServiceImpl{sb: sb} + bus.Sub(result.OnUserRegisteredEvent) + return result +} + +func (s *EmailServiceImpl) OnUserRegisteredEvent(e events.UserRegisteredEvent) { + s.sb.WriteString(fmt.Sprintf("user %s has registered - sending welcome email message\n", e.UserName)) + bus.Pub(events.SMSRequestEvent{ + Number: e.Phone, + Message: fmt.Sprintf("%s your user account was created. Check your email for instructions", e.UserName), + }) +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/notifications/sms_service.go b/common/utils/event/test_scenarios/fire-and-forget/notifications/sms_service.go new file mode 100644 index 000000000..2b99d603d --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/notifications/sms_service.go @@ -0,0 +1,27 @@ +package notifications + +import ( + "fmt" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" +) + +type SmsServiceImpl struct { + sb *strings.Builder +} + +func NewSmsService(sb *strings.Builder) SmsServiceImpl { + result := SmsServiceImpl{sb: sb} + bus.Sub(result.OnSMSSendRequest) + return result +} + +func (s *SmsServiceImpl) OnSMSSendRequest(event events.SMSRequestEvent) { + s.sb.WriteString(fmt.Sprintf("sms sent requested for number %s with message %s\n", event.Number, event.Message)) + bus.Pub(events.SMSSentEvent{ + Request: event, + Status: "successfully sent", + }) +} diff --git a/common/utils/event/test_scenarios/fire-and-forget/users/service.go b/common/utils/event/test_scenarios/fire-and-forget/users/service.go new file mode 100644 index 000000000..3a1be56fa --- /dev/null +++ b/common/utils/event/test_scenarios/fire-and-forget/users/service.go @@ -0,0 +1,25 @@ +package users + +import ( + "context" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/fire-and-forget/events" +) + +type ServiceImpl struct { + sb *strings.Builder + c int +} + +func NewService(sb *strings.Builder) ServiceImpl { + result := ServiceImpl{sb: sb} + return result +} + +func (s *ServiceImpl) RegisterUser(ctx context.Context, name, phone string) { + s.c++ + bus.Pub(events.UserRegisteredEvent{UserName: name, Phone: phone}) + bus.Pub(&events.DummyEvent{AlteredAsync: s.c%2 == 0}) // nobody listens on this one +} diff --git a/common/utils/event/test_scenarios/request-reply-callback/events/main.go b/common/utils/event/test_scenarios/request-reply-callback/events/main.go new file mode 100644 index 000000000..b38217842 --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-callback/events/main.go @@ -0,0 +1,18 @@ +package events + +type RequestEvent[T any] struct { + Payload T + Callback func() (*T, error) + Done chan struct{} +} + +func NewRequestEvent[T any](payload T) *RequestEvent[T] { + return &RequestEvent[T]{ + Payload: payload, + Done: make(chan struct{}), + } +} + +func (i *RequestEvent[T]) Async() bool { + return true // this one is async +} diff --git a/common/utils/event/test_scenarios/request-reply-callback/main_test.go b/common/utils/event/test_scenarios/request-reply-callback/main_test.go new file mode 100644 index 000000000..b1ba28e40 --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-callback/main_test.go @@ -0,0 +1,59 @@ +package request_reply_callback + +import ( + "context" + "strings" + "testing" + + "github.com/badu/bus/test_scenarios/request-reply-callback/orders" +) + +func TestRequestReplyCallback(t *testing.T) { + var sb strings.Builder + orders.NewRepository(&sb) + svc := orders.NewService(&sb) + + ctx := context.Background() + + newOrder0, err := svc.RegisterOrder(ctx, []int{1, 2, 3}) + if err != nil { + t.Fatalf("error creating order : %#v", err) + } + + t.Logf("new order #0 : %#v", newOrder0) + + newOrder1, err := svc.RegisterOrder(ctx, []int{4, 5, 6}) + if err != nil { + t.Fatalf("error creating order : %#v", err) + } + + t.Logf("new order #1 : %#v", newOrder1) + newOrder2, err := svc.RegisterOrder(ctx, []int{7, 8, 9}) + if err != nil { + t.Fatalf("error creating order : %#v", err) + } + + t.Logf("new order #2 : %#v", newOrder2) + + stat0, err := svc.GetOrderStatus(ctx, newOrder0.OrderID) + if err != nil { + t.Fatalf("error getting order status : %#v", err) + } + t.Logf("order #0 status : %s", stat0.Status) + + stat1, err := svc.GetOrderStatus(ctx, newOrder1.OrderID) + if err != nil { + t.Fatalf("error getting order status : %#v", err) + } + + t.Logf("order #1 status : %s", stat1.Status) + + stat2, err := svc.GetOrderStatus(ctx, newOrder2.OrderID) + if err != nil { + t.Fatalf("error getting order status : %#v", err) + } + t.Logf("order #2 status : %s", stat2.Status) + + t.Logf("%s", sb.String()) + +} diff --git a/common/utils/event/test_scenarios/request-reply-callback/orders/repository.go b/common/utils/event/test_scenarios/request-reply-callback/orders/repository.go new file mode 100644 index 000000000..620ae171e --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-callback/orders/repository.go @@ -0,0 +1,57 @@ +package orders + +import ( + "strings" + "time" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/request-reply-callback/events" +) + +type Order struct { + OrderID int + ProductIDs []int +} + +type OrderStatus struct { + OrderID int + Status string +} + +type RepositoryImpl struct { + sb *strings.Builder + calls int +} + +func NewRepository(sb *strings.Builder) RepositoryImpl { + result := RepositoryImpl{sb: sb} + bus.Sub(result.onCreateOrder) + bus.Sub(result.onGetOrderStatus) + return result +} + +func (r *RepositoryImpl) onCreateOrder(event *events.RequestEvent[Order]) { + defer func() { r.calls++ }() + + <-time.After(500 * time.Millisecond) // simulate heavy database call + + event.Callback = func() (*Order, error) { + return &Order{OrderID: r.calls, ProductIDs: event.Payload.ProductIDs}, nil + } + + close(event.Done) +} + +func (r *RepositoryImpl) onGetOrderStatus(event *events.RequestEvent[OrderStatus]) { + <-time.After(300 * time.Millisecond) // simulate heavy database call + + event.Callback = func() (*OrderStatus, error) { + status := "in_progress" + if event.Payload.OrderID == 3 { + status = "cancelled" + } + return &OrderStatus{OrderID: event.Payload.OrderID, Status: status}, nil + } + + close(event.Done) +} diff --git a/common/utils/event/test_scenarios/request-reply-callback/orders/service.go b/common/utils/event/test_scenarios/request-reply-callback/orders/service.go new file mode 100644 index 000000000..52873037c --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-callback/orders/service.go @@ -0,0 +1,35 @@ +package orders + +import ( + "context" + "fmt" + "strings" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/request-reply-callback/events" +) + +type ServiceImpl struct { + sb *strings.Builder +} + +func NewService(sb *strings.Builder) ServiceImpl { + result := ServiceImpl{sb: sb} + return result +} + +func (s *ServiceImpl) RegisterOrder(ctx context.Context, productIDs []int) (*Order, error) { + event := events.NewRequestEvent[Order](Order{ProductIDs: productIDs}) + s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event)) + bus.Pub(event) + <-event.Done // wait for "reply" + return event.Callback() // return the callback, which is containing the actual result +} + +func (s *ServiceImpl) GetOrderStatus(ctx context.Context, orderID int) (*OrderStatus, error) { + event := events.NewRequestEvent[OrderStatus](OrderStatus{OrderID: orderID}) + s.sb.WriteString(fmt.Sprintf("dispatching event typed %T\n", event)) + bus.Pub(event) + <-event.Done // wait for "reply" + return event.Callback() // return the callback, which is containing the actual result +} diff --git a/common/utils/event/test_scenarios/request-reply-with-cancellation/events/main.go b/common/utils/event/test_scenarios/request-reply-with-cancellation/events/main.go new file mode 100644 index 000000000..94f4fa2cc --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-with-cancellation/events/main.go @@ -0,0 +1,33 @@ +package events + +import ( + "context" +) + +type EventState struct { + Ctx context.Context + Done chan struct{} `json:"-"` + Error error +} + +func NewEventState(ctx context.Context) *EventState { + return &EventState{ + Ctx: ctx, + Done: make(chan struct{}), + } +} + +func (s *EventState) Close() { + s.Error = s.Ctx.Err() + close(s.Done) +} + +type NewOrder struct { + ID int +} + +type CreateOrderEvent struct { + NewOrder *NewOrder + ProductIDs []int + State *EventState +} diff --git a/common/utils/event/test_scenarios/request-reply-with-cancellation/main_test.go b/common/utils/event/test_scenarios/request-reply-with-cancellation/main_test.go new file mode 100644 index 000000000..9490fbc19 --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-with-cancellation/main_test.go @@ -0,0 +1,25 @@ +package request_reply + +import ( + "context" + "testing" + "time" + + "github.com/badu/bus/test_scenarios/request-reply-with-cancellation/orders" +) + +func TestRequestReplyWithCancellation(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*1) + svc := orders.NewService() + orders.NewRepository() + + response, err := svc.CreateOrder(ctx, []int{1, 2, 3}) + switch err { + default: + t.Fatalf("error : it supposed to timeout, but it responded %#v and the error is %#v", response, err) + case context.DeadlineExceeded: + // what we were expecting + } + + cancel() +} diff --git a/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/repository.go b/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/repository.go new file mode 100644 index 000000000..3280fb313 --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/repository.go @@ -0,0 +1,41 @@ +package orders + +import ( + "time" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/request-reply-with-cancellation/events" +) + +type Order struct { + ID int + ProductIDs []int +} + +type RepositoryImpl struct { + calls int +} + +func NewRepository() RepositoryImpl { + result := RepositoryImpl{} + bus.Sub(result.OnCreateOrder) + return result +} + +func (r *RepositoryImpl) OnCreateOrder(event events.CreateOrderEvent) { + defer func() { + r.calls++ + }() + + for { + select { + case <-time.After(4 * time.Second): + event.NewOrder = &events.NewOrder{ID: r.calls} + event.State.Close() + return + case <-event.State.Ctx.Done(): + event.State.Close() + return + } + } +} diff --git a/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/service.go b/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/service.go new file mode 100644 index 000000000..8ea63dbfd --- /dev/null +++ b/common/utils/event/test_scenarios/request-reply-with-cancellation/orders/service.go @@ -0,0 +1,28 @@ +package orders + +import ( + "context" + + "github.com/badu/bus" + "github.com/badu/bus/test_scenarios/request-reply-with-cancellation/events" +) + +type ServiceImpl struct { +} + +func NewService() ServiceImpl { + result := ServiceImpl{} + return result +} + +func (s *ServiceImpl) CreateOrder(ctx context.Context, productIDs []int) (*Order, error) { + event := events.CreateOrderEvent{State: events.NewEventState(ctx), ProductIDs: productIDs, NewOrder: &events.NewOrder{}} + bus.Pub(event) + <-event.State.Done + + if event.NewOrder != nil && event.State.Error == nil { + return &Order{ID: event.NewOrder.ID, ProductIDs: productIDs}, nil + } + + return nil, event.State.Error +} diff --git a/common/utils/sqrt_test.go b/common/utils/sqrt_test.go index 95d0b55e2..20c452a1b 100644 --- a/common/utils/sqrt_test.go +++ b/common/utils/sqrt_test.go @@ -4,6 +4,8 @@ import ( "fmt" "math" "testing" + + "github.com/badu/bus" ) func Test_fastSqrt(t *testing.T) { @@ -89,3 +91,78 @@ func BenchmarkFastSqrtInt(b *testing.B) { } } } + +func Test_fastSqr1(b *testing.T) { + // scenario : we have a large number of subscribers. + // we publish an event and while doing that, + // we register another one on a different goroutine + + topic := bus.NewTopic[*Uint32AsyncEvent]() + for i := 0; i < 4096; i++ { + topic.Sub(func(v *Uint32AsyncEvent) { + print(v.u) + }) + } + + topic.Pub(&Uint32AsyncEvent{u: 1}) + // finishPubWait := make(chan struct{}) + // finishSubWait := make(chan struct{}) + // start := make(chan struct{}) + + // go func() { + // <-start + // topic.PubAsync(&Uint32AsyncEvent{u: 1}) + // defer close(finishPubWait) + // }() + + // newSubCalled := false + + // go func() { + // <-start + // topic.Sub(func(v *Uint32AsyncEvent) { + // newSubCalled = true + // }) + // close(finishSubWait) + // }() + + // close(start) // start both goroutines + + // <-finishPubWait // wait for pub to finish + + // <-finishSubWait // wait for sub to finish + + // if newSubCalled { + // print("new subscriber called") + // } + +} + +// Various event types +const EventA = 0x01 + +type Uint32AsyncEvent struct { + u uint32 +} + +// Event type for testing purposes +type Event struct { + Data string + type1 uint32 +} + +// Type returns the event type +func (ev Event) Type() uint32 { + return ev.type1 +} + +// newEventA creates a new instance of an event +func newEventA(data string) Event { + return Event{Data: data, type1: EventA} +} + +// Various event types +const EventB1 = 0x02 + +func newEventB(data string) Event { + return Event{Data: data, type1: EventB1} +} diff --git a/go.work b/go.work index f3f675779..494d52b73 100644 --- a/go.work +++ b/go.work @@ -8,6 +8,7 @@ use ( ./common/cool ./common/utils/bitset ./common/utils/bytearray + ./common/utils/event ./common/utils/go-jsonrpc ./common/utils/log ./common/utils/sturc diff --git a/login/go.mod b/login/go.mod index 4cc3b9397..d481da415 100644 --- a/login/go.mod +++ b/login/go.mod @@ -11,6 +11,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect + github.com/kelindar/event v1.5.2 // indirect github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/panjf2000/ants/v2 v2.11.3 // indirect diff --git a/login/go.sum b/login/go.sum index f34d6f9ae..2fc747fbc 100644 --- a/login/go.sum +++ b/login/go.sum @@ -47,6 +47,8 @@ github.com/jackc/pgx/v5 v5.5.3/go.mod h1:ez9gk+OAat140fv9ErkZDYFWmXLfV+++K0uAOiw github.com/jackc/puddle/v2 v2.2.1/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/kelindar/event v1.5.2 h1:qtgssZqMh/QQMCIxlbx4wU3DoMHOrJXKdiZhphJ4YbY= +github.com/kelindar/event v1.5.2/go.mod h1:UxWPQjWK8u0o9Z3ponm2mgREimM95hm26/M9z8F488Q= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=