khora_agents/asset_agent/
agent.rs

1// Copyright 2025 eraflo
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! The AssetAgent is responsible for managing asset loading and retrieval.
16//!
17//! This agent implements the full GORNA protocol to negotiate resource budgets
18//! with the DCC and adapt loading strategies based on system constraints.
19
20use std::any::{Any, TypeId};
21use std::collections::HashMap;
22use std::fs::File;
23use std::sync::Arc;
24use std::time::Duration;
25
26use anyhow::{anyhow, Context, Result};
27use crossbeam_channel::Sender;
28use khora_core::agent::Agent;
29use khora_core::asset::{Asset, AssetHandle, AssetUUID};
30use khora_core::control::gorna::{
31    AgentId, AgentStatus, NegotiationRequest, NegotiationResponse, ResourceBudget, StrategyId,
32    StrategyOption,
33};
34use khora_core::telemetry::event::TelemetryEvent;
35use khora_core::telemetry::monitoring::GpuReport;
36use khora_core::vfs::VirtualFileSystem;
37use khora_data::assets::Assets;
38use khora_lanes::asset_lane::{AssetLoaderLane, PackLoadingLane};
39use khora_telemetry::MetricsRegistry;
40
41use super::loader::AssetLoaderLaneRegistry;
42
43/// The AssetAgent is responsible for managing asset loading and retrieval.
44pub struct AssetAgent {
45    vfs: VirtualFileSystem,
46    loading_lane: PackLoadingLane,
47    loaders: AssetLoaderLaneRegistry,
48    storages: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
49    current_strategy: StrategyId,
50    loading_budget_per_frame: usize,
51    last_load_count: usize,
52    frame_count: u64,
53    telemetry_sender: Option<Sender<TelemetryEvent>>,
54}
55
56impl Agent for AssetAgent {
57    fn id(&self) -> AgentId {
58        AgentId::Asset
59    }
60
61    fn negotiate(&mut self, _request: NegotiationRequest) -> NegotiationResponse {
62        NegotiationResponse {
63            strategies: vec![
64                StrategyOption {
65                    id: StrategyId::LowPower,
66                    estimated_time: Duration::from_micros(50),
67                    estimated_vram: 0,
68                },
69                StrategyOption {
70                    id: StrategyId::Balanced,
71                    estimated_time: Duration::from_micros(200),
72                    estimated_vram: 0,
73                },
74                StrategyOption {
75                    id: StrategyId::HighPerformance,
76                    estimated_time: Duration::from_micros(500),
77                    estimated_vram: 0,
78                },
79            ],
80        }
81    }
82
83    fn apply_budget(&mut self, budget: ResourceBudget) {
84        log::info!("AssetAgent: Strategy update to {:?}", budget.strategy_id,);
85
86        self.current_strategy = budget.strategy_id;
87
88        self.loading_budget_per_frame = match budget.strategy_id {
89            StrategyId::LowPower => 1,
90            StrategyId::Balanced => 3,
91            StrategyId::HighPerformance => 10,
92            StrategyId::Custom(factor) => (factor as usize).clamp(1, 20),
93        };
94    }
95
96    fn update(&mut self, _context: &mut khora_core::EngineContext<'_>) {
97        self.frame_count += 1;
98        self.emit_telemetry();
99    }
100
101    fn report_status(&self) -> AgentStatus {
102        let cached_assets = self.storages.len();
103
104        AgentStatus {
105            agent_id: self.id(),
106            health_score: 1.0,
107            current_strategy: self.current_strategy,
108            is_stalled: false,
109            message: format!(
110                "cached_types={} last_loads={} budget={}",
111                cached_assets, self.last_load_count, self.loading_budget_per_frame
112            ),
113        }
114    }
115
116    fn execute(&mut self) {
117        // Asset loading is performed in update() via the tactical coordination.
118    }
119
120    fn as_any(&self) -> &dyn std::any::Any {
121        self
122    }
123
124    fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
125        self
126    }
127}
128
129impl AssetAgent {
130    /// Creates a new `AssetAgent` with the given VFS and loading lane.
131    pub fn new(
132        index_bytes: &[u8],
133        data_file: File,
134        metrics_registry: Arc<MetricsRegistry>,
135    ) -> Result<Self> {
136        let vfs = VirtualFileSystem::new(index_bytes)
137            .context("Failed to initialize VirtualFileSystem from index bytes")?;
138
139        let loading_lane = PackLoadingLane::new(data_file);
140
141        Ok(Self {
142            vfs,
143            loading_lane,
144            loaders: AssetLoaderLaneRegistry::new(metrics_registry),
145            storages: HashMap::new(),
146            current_strategy: StrategyId::Balanced,
147            loading_budget_per_frame: 3,
148            last_load_count: 0,
149            frame_count: 0,
150            telemetry_sender: None,
151        })
152    }
153
154    /// Attaches a DCC sender for telemetry events.
155    pub fn with_dcc_sender(mut self, sender: Sender<TelemetryEvent>) -> Self {
156        self.telemetry_sender = Some(sender);
157        self
158    }
159
160    /// Registers an `AssetLoaderLane` for a specific asset type name.
161    pub fn register_loader<A: Asset>(
162        &mut self,
163        type_name: &str,
164        loader: impl AssetLoaderLane<A> + Send + Sync + 'static,
165    ) {
166        self.loaders.register::<A>(type_name, loader);
167    }
168
169    /// Loads, decodes, and returns a typed handle to an asset.
170    pub fn load<A: Asset>(&mut self, uuid: &AssetUUID) -> Result<AssetHandle<A>> {
171        let type_id = TypeId::of::<A>();
172
173        let storage = self
174            .storages
175            .entry(type_id)
176            .or_insert_with(|| Box::new(Assets::<A>::new()));
177
178        let assets = storage
179            .downcast_mut::<Assets<A>>()
180            .ok_or_else(|| anyhow!("Mismatched asset storage type"))?;
181
182        if let Some(handle) = assets.get(uuid) {
183            return Ok(handle.clone());
184        }
185
186        let metadata = self
187            .vfs
188            .get_metadata(uuid)
189            .ok_or_else(|| anyhow!("Asset with UUID {:?} not found in VFS", uuid))?;
190
191        let source = metadata
192            .variants
193            .get("default")
194            .ok_or_else(|| anyhow!("Asset {:?} has no 'default' variant", uuid))?;
195
196        let bytes = self.loading_lane.load_asset_bytes(source)?;
197
198        let asset: A = self.loaders.load::<A>(&metadata.asset_type_name, &bytes)?;
199
200        let handle = AssetHandle::new(asset);
201        assets.insert(*uuid, handle.clone());
202
203        self.last_load_count += 1;
204
205        Ok(handle)
206    }
207
208    fn emit_telemetry(&self) {
209        if let Some(sender) = &self.telemetry_sender {
210            let report = GpuReport {
211                frame_number: self.frame_count,
212                draw_calls: 0,
213                triangles_rendered: 0,
214                ..Default::default()
215            };
216            let _ = sender.send(TelemetryEvent::GpuReport(report));
217        }
218    }
219
220    /// Returns the current strategy.
221    pub fn current_strategy(&self) -> StrategyId {
222        self.current_strategy
223    }
224
225    /// Returns the loading budget per frame.
226    pub fn loading_budget(&self) -> usize {
227        self.loading_budget_per_frame
228    }
229}