khora_agents/ecs_agent/
garbage_collector_agent.rs

1// Copyright 2025 eraflo
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The Intelligent Subsystem Agent for garbage collection.
16//!
17//! This agent collects orphaned data locations, decides when and how much to clean
18//! based on a strategy negotiated with the DCC via GORNA, and dispatches the work
19//! to a `CompactionLane`.
20
21use std::collections::VecDeque;
22use std::time::Duration;
23
24use crossbeam_channel::Sender;
25use khora_core::agent::Agent;
26use khora_core::control::gorna::{
27    AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
28    StrategyOption,
29};
30use khora_core::telemetry::event::TelemetryEvent;
31use khora_data::ecs::{PageIndex, SemanticDomain, World, WorldMaintenance};
32use khora_lanes::ecs_lane::{CompactionLane, GcWorkPlan};
33
34const DEFAULT_MAX_CLEANUP_PER_FRAME: usize = 10;
35const HIGH_PERFORMANCE_CLEANUP_MULTIPLIER: usize = 3;
36const LOW_POWER_CLEANUP_DIVISOR: usize = 4;
37
38/// The agent responsible for garbage collection of orphaned ECS data.
39pub struct GarbageCollectorAgent {
40    pending_cleanup: VecDeque<(PageIndex, SemanticDomain)>,
41    pending_vacuum: VecDeque<(u32, u32)>,
42    compaction_lane: CompactionLane,
43    current_strategy: StrategyId,
44    max_cleanup_per_frame: usize,
45    last_cleanup_count: usize,
46    frame_count: u64,
47    telemetry_sender: Option<Sender<TelemetryEvent>>,
48}
49
50impl Agent for GarbageCollectorAgent {
51    fn id(&self) -> AgentId {
52        AgentId::Ecs
53    }
54
55    fn negotiate(&mut self, _request: NegotiationRequest) -> NegotiationResponse {
56        let pending_count = self.pending_cleanup.len() + self.pending_vacuum.len();
57        let urgency_factor = (pending_count as f32 / 100.0).min(5.0);
58
59        NegotiationResponse {
60            strategies: vec![
61                StrategyOption {
62                    id: StrategyId::LowPower,
63                    estimated_time: Duration::from_micros(50),
64                    estimated_vram: 0,
65                },
66                StrategyOption {
67                    id: StrategyId::Balanced,
68                    estimated_time: Duration::from_micros((100.0 * (1.0 + urgency_factor)) as u64),
69                    estimated_vram: 0,
70                },
71                StrategyOption {
72                    id: StrategyId::HighPerformance,
73                    estimated_time: Duration::from_micros((500.0 * (1.0 + urgency_factor)) as u64),
74                    estimated_vram: 0,
75                },
76            ],
77        }
78    }
79
80    fn apply_budget(&mut self, budget: ResourceBudget) {
81        log::info!(
82            "GarbageCollectorAgent: Strategy update to {:?}",
83            budget.strategy_id,
84        );
85
86        self.current_strategy = budget.strategy_id;
87
88        self.max_cleanup_per_frame = match budget.strategy_id {
89            StrategyId::LowPower => {
90                (DEFAULT_MAX_CLEANUP_PER_FRAME / LOW_POWER_CLEANUP_DIVISOR).max(1)
91            }
92            StrategyId::Balanced => DEFAULT_MAX_CLEANUP_PER_FRAME,
93            StrategyId::HighPerformance => {
94                DEFAULT_MAX_CLEANUP_PER_FRAME * HIGH_PERFORMANCE_CLEANUP_MULTIPLIER
95            }
96            StrategyId::Custom(factor) => (factor as usize).clamp(1, 100),
97        };
98    }
99
100    fn update(&mut self, context: &mut khora_core::EngineContext<'_>) {
101        if let Some(world_any) = context.world.as_deref_mut() {
102            if let Some(world) = world_any.downcast_mut::<World>() {
103                self.run(world);
104            }
105        }
106        self.frame_count += 1;
107    }
108
109    fn report_status(&self) -> AgentStatus {
110        let pending = self.pending_cleanup.len() + self.pending_vacuum.len();
111        let health_score = if pending == 0 {
112            1.0
113        } else if pending < 100 {
114            0.8
115        } else if pending < 500 {
116            0.5
117        } else {
118            0.2
119        };
120
121        AgentStatus {
122            agent_id: self.id(),
123            health_score,
124            current_strategy: self.current_strategy,
125            is_stalled: false,
126            message: format!(
127                "pending_cleanup={} pending_vacuum={} last_cleaned={}",
128                self.pending_cleanup.len(),
129                self.pending_vacuum.len(),
130                self.last_cleanup_count,
131            ),
132        }
133    }
134
135    fn execute(&mut self) {
136        // GC work is performed in update() via the tactical coordination.
137    }
138
139    fn as_any(&self) -> &dyn std::any::Any {
140        self
141    }
142
143    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
144        self
145    }
146}
147
148impl GarbageCollectorAgent {
149    /// Creates a new `GarbageCollectorAgent`.
150    pub fn new() -> Self {
151        Self {
152            pending_cleanup: VecDeque::new(),
153            pending_vacuum: VecDeque::new(),
154            compaction_lane: CompactionLane::new(),
155            current_strategy: StrategyId::Balanced,
156            max_cleanup_per_frame: DEFAULT_MAX_CLEANUP_PER_FRAME,
157            last_cleanup_count: 0,
158            frame_count: 0,
159            telemetry_sender: None,
160        }
161    }
162
163    /// Attaches a DCC sender for telemetry events.
164    pub fn with_dcc_sender(mut self, sender: Sender<TelemetryEvent>) -> Self {
165        self.telemetry_sender = Some(sender);
166        self
167    }
168
169    /// Adds a new orphaned data location to the cleanup queue.
170    pub fn queue_cleanup(&mut self, page_index: PageIndex, domain: SemanticDomain) {
171        self.pending_cleanup.push_back((page_index, domain));
172    }
173
174    /// Adds a vacuum request for a page hole.
175    pub fn queue_vacuum(&mut self, page_index: u32, hole_row_index: u32) {
176        self.pending_vacuum.push_back((page_index, hole_row_index));
177    }
178
179    /// Runs the agent's decision-making and execution logic for one frame.
180    pub fn run(&mut self, world: &mut World) {
181        if self.pending_cleanup.is_empty() && self.pending_vacuum.is_empty() {
182            self.last_cleanup_count = 0;
183            return;
184        }
185
186        let budget = self.max_cleanup_per_frame;
187
188        let items_to_clean: Vec<_> = self
189            .pending_cleanup
190            .drain(..budget.min(self.pending_cleanup.len()))
191            .collect();
192
193        let pages_to_vacuum: Vec<_> = self
194            .pending_vacuum
195            .drain(..budget.min(self.pending_vacuum.len()))
196            .collect();
197
198        let items_count = items_to_clean.len();
199        let pages_count = pages_to_vacuum.len();
200        self.last_cleanup_count = items_count + pages_count;
201
202        if self.last_cleanup_count == 0 {
203            return;
204        }
205
206        let work_plan = GcWorkPlan {
207            budget,
208            items_to_clean,
209            pages_to_vacuum,
210        };
211
212        self.compaction_lane
213            .run(world as &mut dyn WorldMaintenance, &work_plan);
214
215        log::trace!(
216            "GarbageCollectorAgent: Cleaned {} items, {} pages vacuumed (strategy={:?})",
217            items_count,
218            pages_count,
219            self.current_strategy,
220        );
221    }
222
223    /// Returns the total count of pending cleanup items.
224    pub fn pending_count(&self) -> usize {
225        self.pending_cleanup.len() + self.pending_vacuum.len()
226    }
227
228    /// Returns the current strategy.
229    pub fn current_strategy(&self) -> StrategyId {
230        self.current_strategy
231    }
232
233    /// Returns the maximum cleanup operations per frame.
234    pub fn max_cleanup_per_frame(&self) -> usize {
235        self.max_cleanup_per_frame
236    }
237}
238
239impl Default for GarbageCollectorAgent {
240    fn default() -> Self {
241        Self::new()
242    }
243}