khora_core/event/
bus.rs

1// Copyright 2025 eraflo
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use log;
16
17/// Manages a generic, multi-producer, single-consumer (MPSC), thread-safe event channel.
18///
19/// This `EventBus` is generic over the event type `T` it transports. It serves as a
20/// foundational communication primitive within Khora, allowing different parts of the
21/// engine to communicate in a decoupled manner.
22///
23/// The design is intentional: there are many senders but only one receiver, ensuring
24/// that a single, authoritative system is responsible for processing all events of a
25/// given type. Senders can be cloned freely and passed to different threads.
26///
27/// # Examples
28///
29/// ```
30/// # use khora_core::event::EventBus;
31/// #[derive(Clone, Debug, PartialEq)]
32/// enum GameEvent {
33///     PlayerJumped,
34///     ScoreChanged(u32),
35/// }
36///
37/// // Create a new bus for our specific event type.
38/// let event_bus = EventBus::<GameEvent>::new();
39///
40/// // Clone the sender to give to a game system.
41/// let sender = event_bus.sender();
42///
43/// // A system publishes an event.
44/// sender.send(GameEvent::PlayerJumped);
45///
46/// // The main event loop (the owner of the bus) processes the event.
47/// if let Ok(event) = event_bus.receiver().try_recv() {
48///     assert_eq!(event, GameEvent::PlayerJumped);
49/// }
50/// ```
51#[derive(Debug)]
52pub struct EventBus<T: Clone + Send + Sync + 'static> {
53    sender: flume::Sender<T>,
54    receiver: flume::Receiver<T>,
55}
56
57impl<T: Clone + Send + Sync + 'static> EventBus<T> {
58    /// Creates a new `EventBus` with an unbounded channel.
59    pub fn new() -> Self {
60        let (sender, receiver) = flume::unbounded();
61        log::info!(
62            "Generic EventBus initialized for type {}.",
63            std::any::type_name::<T>()
64        );
65        Self { sender, receiver }
66    }
67
68    /// Publishes an event to all receivers.
69    ///
70    /// This method is a convenience wrapper around the channel's `send` operation.
71    /// It logs an error if the send fails, which typically means the receiver
72    /// (and thus the `EventBus` instance) has been dropped.
73    ///
74    /// # Arguments
75    ///
76    /// * `event`: The event of type `T` to be sent over the channel.
77    pub fn publish(&self, event: T) {
78        log::trace!(
79            "Publishing an event of type {}.",
80            std::any::type_name::<T>()
81        );
82
83        if let Err(e) = self.sender.send(event) {
84            log::error!("Failed to send event: {e}. Receiver likely disconnected.");
85        }
86    }
87
88    /// Returns a clone of the sender part of the channel.
89    ///
90    /// This is the primary way to allow other parts of the system to send events
91    /// without giving them ownership of the entire bus. Senders can be cloned
92    /// multiple times and sent across threads.
93    pub fn sender(&self) -> flume::Sender<T> {
94        self.sender.clone()
95    }
96
97    /// Returns a reference to the receiver part of the channel.
98    ///
99    /// This is intended for the owner of the bus (e.g., the main event loop) to
100    /// process incoming events. It returns a reference to prevent the receiver
101    /// from being moved out of the `EventBus`.
102    pub fn receiver(&self) -> &flume::Receiver<T> {
103        &self.receiver
104    }
105}
106
107impl<T: Clone + Send + Sync + 'static> Default for EventBus<T> {
108    fn default() -> Self {
109        Self::new()
110    }
111}
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116    use flume::{SendError, TryRecvError};
117    use std::{thread, time::Duration};
118
119    /// A local, self-contained event enum for testing purposes.
120    /// This mimics the old `EngineEvent` without creating external dependencies.
121    #[derive(Debug, Clone, PartialEq)]
122    enum TestEvent {
123        WindowResized { width: u32, height: u32 },
124        KeyPressed { key_code: String },
125        ShutdownRequested,
126    }
127
128    fn dummy_key_event() -> TestEvent {
129        TestEvent::KeyPressed {
130            key_code: "Test".to_string(),
131        }
132    }
133
134    #[test]
135    fn event_bus_creation() {
136        let bus = EventBus::<TestEvent>::new();
137        let _sender = bus.sender();
138        // The receiver is private, which is good.
139        assert!(bus.receiver().is_empty());
140    }
141
142    #[test]
143    fn send_receive_single_event() {
144        let bus = EventBus::<TestEvent>::new();
145        let sender = bus.sender();
146        let receiver = bus.receiver();
147        let event_to_send = dummy_key_event();
148
149        sender
150            .send(event_to_send.clone())
151            .expect("Send should succeed");
152
153        match receiver.recv_timeout(Duration::from_millis(100)) {
154            Ok(received_event) => assert_eq!(received_event, event_to_send),
155            Err(e) => panic!("Failed to receive event: {e:?}"),
156        }
157    }
158
159    #[test]
160    fn try_receive_empty() {
161        let bus = EventBus::<TestEvent>::new();
162        let receiver = bus.receiver();
163
164        match receiver.try_recv() {
165            Err(TryRecvError::Empty) => { /* This is the expected outcome */ }
166            Ok(event) => panic!("Received unexpected event: {event:?}"),
167            Err(e) => panic!("Received unexpected error: {e:?}"),
168        }
169    }
170
171    #[test]
172    fn send_receive_multiple_events() {
173        let bus = EventBus::<TestEvent>::new();
174        let sender = bus.sender();
175        let receiver = bus.receiver();
176
177        let event1 = TestEvent::WindowResized {
178            width: 1,
179            height: 1,
180        };
181        let event2 = dummy_key_event();
182        let event3 = TestEvent::ShutdownRequested;
183
184        sender.send(event1.clone()).expect("Send 1 should succeed");
185        sender.send(event2.clone()).expect("Send 2 should succeed");
186        sender.send(event3.clone()).expect("Send 3 should succeed");
187
188        let mut received_events = Vec::new();
189        for _ in 0..3 {
190            match receiver.recv_timeout(Duration::from_millis(50)) {
191                Ok(event) => received_events.push(event),
192                Err(e) => panic!("Failed to receive event within timeout: {e:?}"),
193            }
194        }
195
196        assert_eq!(received_events.len(), 3);
197        assert_eq!(received_events[0], event1);
198        assert_eq!(received_events[1], event2);
199        assert_eq!(received_events[2], event3);
200
201        assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
202    }
203
204    #[test]
205    fn multiple_senders() {
206        let bus = EventBus::<TestEvent>::new();
207        let sender1 = bus.sender();
208        let sender2 = bus.sender();
209        let receiver = bus.receiver();
210
211        let event1 = TestEvent::WindowResized {
212            width: 1,
213            height: 1,
214        };
215        let event2 = dummy_key_event();
216
217        sender1.send(event1.clone()).expect("Send 1 should succeed");
218        sender2.send(event2.clone()).expect("Send 2 should succeed");
219
220        let rec1 = receiver
221            .recv_timeout(Duration::from_millis(50))
222            .expect("Receive 1 failed");
223        let rec2 = receiver
224            .recv_timeout(Duration::from_millis(50))
225            .expect("Receive 2 failed");
226
227        assert!((rec1 == event1 && rec2 == event2) || (rec1 == event2 && rec2 == event1));
228    }
229
230    #[test]
231    fn send_from_thread() {
232        let bus = EventBus::<TestEvent>::new();
233        let sender_clone = bus.sender();
234        let receiver = bus.receiver();
235        let event_to_send = dummy_key_event();
236        let event_clone = event_to_send.clone();
237
238        let handle = thread::spawn(move || {
239            thread::sleep(Duration::from_millis(20));
240            sender_clone
241                .send(event_clone)
242                .expect("Send from thread failed");
243            log::trace!("Event sent from spawned thread.");
244        });
245
246        log::trace!("Main thread waiting for event...");
247        match receiver.recv_timeout(Duration::from_secs(1)) {
248            Ok(received_event) => {
249                log::trace!("Main thread received event.");
250                assert_eq!(received_event, event_to_send);
251            }
252            Err(e) => panic!("Failed to receive event from thread: {e:?}"),
253        }
254
255        handle.join().expect("Thread join failed");
256    }
257
258    #[test]
259    fn send_error_on_receiver_drop() {
260        let bus = EventBus::<TestEvent>::new();
261        let sender = bus.sender();
262        let event_to_send = dummy_key_event();
263
264        drop(bus);
265        log::trace!("EventBus (and receiver) dropped.");
266
267        match sender.send(event_to_send) {
268            Err(SendError(_)) => { /* This is the expected outcome */ }
269            Ok(()) => panic!("Send unexpectedly succeeded after receiver drop"),
270        }
271        log::trace!("Send correctly failed after receiver drop.");
272    }
273}