[service] Run event callbacks asynchronously

This commit is contained in:
Daniel 2024-08-28 11:55:32 +02:00
parent de4cb5b34f
commit 4b2e4f208f

View file

@ -2,7 +2,6 @@
package mgr package mgr
import ( import (
"fmt"
"slices" "slices"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -107,6 +106,12 @@ func (em *EventMgr[T]) Submit(event T) {
// Run callbacks. // Run callbacks.
for _, ec := range em.callbacks { for _, ec := range em.callbacks {
// Check if callback was canceled.
if ec.canceled.Load() {
anyCanceled = true
continue
}
// Execute callback. // Execute callback.
var ( var (
cancel bool cancel bool
@ -114,29 +119,38 @@ func (em *EventMgr[T]) Submit(event T) {
) )
if em.mgr != nil { if em.mgr != nil {
// Prefer executing in worker. // Prefer executing in worker.
wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error { name := "event " + em.name + " callback " + ec.name
cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope. em.mgr.Go(name, func(w *WorkerCtx) error {
cancel, err = ec.callback(w, event)
// Handle error and cancelation.
if err != nil {
w.Warn(
"event callback failed",
"event", em.name,
"callback", ec.name,
"err", err,
)
}
if cancel {
ec.canceled.Store(true)
}
return nil return nil
}) })
if wkrErr != nil {
err = fmt.Errorf("callback execution failed: %w", wkrErr)
}
} else { } else {
cancel, err = ec.callback(nil, event) cancel, err = ec.callback(nil, event)
} // Handle error and cancelation.
if err != nil && em.mgr != nil {
// Handle error and cancelation. em.mgr.Warn(
if err != nil && em.mgr != nil { "event callback failed",
em.mgr.Warn( "event", em.name,
"event callback failed", "callback", ec.name,
"event", em.name, "err", err,
"callback", ec.name, )
"err", err, }
) if cancel {
} ec.canceled.Store(true)
if cancel { anyCanceled = true
ec.canceled.Store(true) }
anyCanceled = true
} }
} }