1use log;
16
17#[derive(Debug)]
52pub struct EventBus<T: Clone + Send + Sync + 'static> {
53 sender: flume::Sender<T>,
54 receiver: flume::Receiver<T>,
55}
56
57impl<T: Clone + Send + Sync + 'static> EventBus<T> {
58 pub fn new() -> Self {
60 let (sender, receiver) = flume::unbounded();
61 log::info!(
62 "Generic EventBus initialized for type {}.",
63 std::any::type_name::<T>()
64 );
65 Self { sender, receiver }
66 }
67
68 pub fn publish(&self, event: T) {
78 log::trace!(
79 "Publishing an event of type {}.",
80 std::any::type_name::<T>()
81 );
82
83 if let Err(e) = self.sender.send(event) {
84 log::error!("Failed to send event: {e}. Receiver likely disconnected.");
85 }
86 }
87
88 pub fn sender(&self) -> flume::Sender<T> {
94 self.sender.clone()
95 }
96
97 pub fn receiver(&self) -> &flume::Receiver<T> {
103 &self.receiver
104 }
105}
106
107impl<T: Clone + Send + Sync + 'static> Default for EventBus<T> {
108 fn default() -> Self {
109 Self::new()
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116 use flume::{SendError, TryRecvError};
117 use std::{thread, time::Duration};
118
119 #[derive(Debug, Clone, PartialEq)]
122 enum TestEvent {
123 WindowResized { width: u32, height: u32 },
124 KeyPressed { key_code: String },
125 ShutdownRequested,
126 }
127
128 fn dummy_key_event() -> TestEvent {
129 TestEvent::KeyPressed {
130 key_code: "Test".to_string(),
131 }
132 }
133
134 #[test]
135 fn event_bus_creation() {
136 let bus = EventBus::<TestEvent>::new();
137 let _sender = bus.sender();
138 assert!(bus.receiver().is_empty());
140 }
141
142 #[test]
143 fn send_receive_single_event() {
144 let bus = EventBus::<TestEvent>::new();
145 let sender = bus.sender();
146 let receiver = bus.receiver();
147 let event_to_send = dummy_key_event();
148
149 sender
150 .send(event_to_send.clone())
151 .expect("Send should succeed");
152
153 match receiver.recv_timeout(Duration::from_millis(100)) {
154 Ok(received_event) => assert_eq!(received_event, event_to_send),
155 Err(e) => panic!("Failed to receive event: {e:?}"),
156 }
157 }
158
159 #[test]
160 fn try_receive_empty() {
161 let bus = EventBus::<TestEvent>::new();
162 let receiver = bus.receiver();
163
164 match receiver.try_recv() {
165 Err(TryRecvError::Empty) => { }
166 Ok(event) => panic!("Received unexpected event: {event:?}"),
167 Err(e) => panic!("Received unexpected error: {e:?}"),
168 }
169 }
170
171 #[test]
172 fn send_receive_multiple_events() {
173 let bus = EventBus::<TestEvent>::new();
174 let sender = bus.sender();
175 let receiver = bus.receiver();
176
177 let event1 = TestEvent::WindowResized {
178 width: 1,
179 height: 1,
180 };
181 let event2 = dummy_key_event();
182 let event3 = TestEvent::ShutdownRequested;
183
184 sender.send(event1.clone()).expect("Send 1 should succeed");
185 sender.send(event2.clone()).expect("Send 2 should succeed");
186 sender.send(event3.clone()).expect("Send 3 should succeed");
187
188 let mut received_events = Vec::new();
189 for _ in 0..3 {
190 match receiver.recv_timeout(Duration::from_millis(50)) {
191 Ok(event) => received_events.push(event),
192 Err(e) => panic!("Failed to receive event within timeout: {e:?}"),
193 }
194 }
195
196 assert_eq!(received_events.len(), 3);
197 assert_eq!(received_events[0], event1);
198 assert_eq!(received_events[1], event2);
199 assert_eq!(received_events[2], event3);
200
201 assert_eq!(receiver.try_recv(), Err(TryRecvError::Empty));
202 }
203
204 #[test]
205 fn multiple_senders() {
206 let bus = EventBus::<TestEvent>::new();
207 let sender1 = bus.sender();
208 let sender2 = bus.sender();
209 let receiver = bus.receiver();
210
211 let event1 = TestEvent::WindowResized {
212 width: 1,
213 height: 1,
214 };
215 let event2 = dummy_key_event();
216
217 sender1.send(event1.clone()).expect("Send 1 should succeed");
218 sender2.send(event2.clone()).expect("Send 2 should succeed");
219
220 let rec1 = receiver
221 .recv_timeout(Duration::from_millis(50))
222 .expect("Receive 1 failed");
223 let rec2 = receiver
224 .recv_timeout(Duration::from_millis(50))
225 .expect("Receive 2 failed");
226
227 assert!((rec1 == event1 && rec2 == event2) || (rec1 == event2 && rec2 == event1));
228 }
229
230 #[test]
231 fn send_from_thread() {
232 let bus = EventBus::<TestEvent>::new();
233 let sender_clone = bus.sender();
234 let receiver = bus.receiver();
235 let event_to_send = dummy_key_event();
236 let event_clone = event_to_send.clone();
237
238 let handle = thread::spawn(move || {
239 thread::sleep(Duration::from_millis(20));
240 sender_clone
241 .send(event_clone)
242 .expect("Send from thread failed");
243 log::trace!("Event sent from spawned thread.");
244 });
245
246 log::trace!("Main thread waiting for event...");
247 match receiver.recv_timeout(Duration::from_secs(1)) {
248 Ok(received_event) => {
249 log::trace!("Main thread received event.");
250 assert_eq!(received_event, event_to_send);
251 }
252 Err(e) => panic!("Failed to receive event from thread: {e:?}"),
253 }
254
255 handle.join().expect("Thread join failed");
256 }
257
258 #[test]
259 fn send_error_on_receiver_drop() {
260 let bus = EventBus::<TestEvent>::new();
261 let sender = bus.sender();
262 let event_to_send = dummy_key_event();
263
264 drop(bus);
265 log::trace!("EventBus (and receiver) dropped.");
266
267 match sender.send(event_to_send) {
268 Err(SendError(_)) => { }
269 Ok(()) => panic!("Send unexpectedly succeeded after receiver drop"),
270 }
271 log::trace!("Send correctly failed after receiver drop.");
272 }
273}