]> git.proxmox.com Git - proxmox-backup.git/blob - src/api2/backup.rs
6da65d3b93a90c8eca3f31b0c136aaa9de2a5b89
[proxmox-backup.git] / src / api2 / backup.rs
1 use anyhow::{bail, format_err, Error};
2 use futures::*;
3 use hyper::header::{HeaderValue, UPGRADE};
4 use hyper::http::request::Parts;
5 use hyper::{Body, Response, StatusCode};
6 use serde_json::{json, Value};
7
8 use proxmox::{sortable, identity, list_subdirs_api_method};
9 use proxmox::api::{ApiResponseFuture, ApiHandler, ApiMethod, Router, RpcEnvironment, Permission};
10 use proxmox::api::router::SubdirMap;
11 use proxmox::api::schema::*;
12
13 use crate::tools::{self, WrappedReaderStream};
14 use crate::server::{WorkerTask, H2Service};
15 use crate::backup::*;
16 use crate::api2::types::*;
17 use crate::config::acl::PRIV_DATASTORE_BACKUP;
18 use crate::config::cached_user_info::CachedUserInfo;
19
20 mod environment;
21 use environment::*;
22
23 mod upload_chunk;
24 use upload_chunk::*;
25
26 pub const ROUTER: Router = Router::new()
27 .upgrade(&API_METHOD_UPGRADE_BACKUP);
28
29 #[sortable]
30 pub const API_METHOD_UPGRADE_BACKUP: ApiMethod = ApiMethod::new(
31 &ApiHandler::AsyncHttp(&upgrade_to_backup_protocol),
32 &ObjectSchema::new(
33 concat!("Upgraded to backup protocol ('", PROXMOX_BACKUP_PROTOCOL_ID_V1!(), "')."),
34 &sorted!([
35 ("store", false, &DATASTORE_SCHEMA),
36 ("backup-type", false, &BACKUP_TYPE_SCHEMA),
37 ("backup-id", false, &BACKUP_ID_SCHEMA),
38 ("backup-time", false, &BACKUP_TIME_SCHEMA),
39 ("debug", true, &BooleanSchema::new("Enable verbose debug logging.").schema()),
40 ]),
41 )
42 ).access(
43 // Note: parameter 'store' is no uri parameter, so we need to test inside function body
44 Some("The user needs Datastore.Backup privilege on /datastore/{store} and needs to own the backup group."),
45 &Permission::Anybody
46 );
47
48 fn upgrade_to_backup_protocol(
49 parts: Parts,
50 req_body: Body,
51 param: Value,
52 _info: &ApiMethod,
53 rpcenv: Box<dyn RpcEnvironment>,
54 ) -> ApiResponseFuture {
55
56 async move {
57 let debug = param["debug"].as_bool().unwrap_or(false);
58
59 let username = rpcenv.get_user().unwrap();
60
61 let store = tools::required_string_param(&param, "store")?.to_owned();
62
63 let user_info = CachedUserInfo::new()?;
64 user_info.check_privs(&username, &["datastore", &store], PRIV_DATASTORE_BACKUP, false)?;
65
66 let datastore = DataStore::lookup_datastore(&store)?;
67
68 let backup_type = tools::required_string_param(&param, "backup-type")?;
69 let backup_id = tools::required_string_param(&param, "backup-id")?;
70 let backup_time = tools::required_integer_param(&param, "backup-time")?;
71
72 let protocols = parts
73 .headers
74 .get("UPGRADE")
75 .ok_or_else(|| format_err!("missing Upgrade header"))?
76 .to_str()?;
77
78 if protocols != PROXMOX_BACKUP_PROTOCOL_ID_V1!() {
79 bail!("invalid protocol name");
80 }
81
82 if parts.version >= http::version::Version::HTTP_2 {
83 bail!("unexpected http version '{:?}' (expected version < 2)", parts.version);
84 }
85
86 let worker_id = format!("{}_{}_{}", store, backup_type, backup_id);
87
88 let env_type = rpcenv.env_type();
89
90 let backup_group = BackupGroup::new(backup_type, backup_id);
91 let owner = datastore.create_backup_group(&backup_group, &username)?;
92 // permission check
93 if owner != username { // only the owner is allowed to create additional snapshots
94 bail!("backup owner check failed ({} != {})", username, owner);
95 }
96
97 let last_backup = BackupInfo::last_backup(&datastore.base_path(), &backup_group).unwrap_or(None);
98 let backup_dir = BackupDir::new_with_group(backup_group, backup_time);
99
100 if let Some(last) = &last_backup {
101 if backup_dir.backup_time() <= last.backup_dir.backup_time() {
102 bail!("backup timestamp is older than last backup.");
103 }
104 // fixme: abort if last backup is still running - howto test?
105 // Idea: write upid into a file inside snapshot dir. then test if
106 // it is still running here.
107 }
108
109 let (path, is_new) = datastore.create_backup_dir(&backup_dir)?;
110 if !is_new { bail!("backup directory already exists."); }
111
112 WorkerTask::spawn("backup", Some(worker_id), &username.clone(), true, move |worker| {
113 let mut env = BackupEnvironment::new(
114 env_type, username.clone(), worker.clone(), datastore, backup_dir);
115
116 env.debug = debug;
117 env.last_backup = last_backup;
118
119 env.log(format!("starting new backup on datastore '{}': {:?}", store, path));
120
121 let service = H2Service::new(env.clone(), worker.clone(), &BACKUP_API_ROUTER, debug);
122
123 let abort_future = worker.abort_future();
124
125 let env2 = env.clone();
126
127 let mut req_fut = req_body
128 .on_upgrade()
129 .map_err(Error::from)
130 .and_then(move |conn| {
131 env2.debug("protocol upgrade done");
132
133 let mut http = hyper::server::conn::Http::new();
134 http.http2_only(true);
135 // increase window size: todo - find optiomal size
136 let window_size = 32*1024*1024; // max = (1 << 31) - 2
137 http.http2_initial_stream_window_size(window_size);
138 http.http2_initial_connection_window_size(window_size);
139
140 http.serve_connection(conn, service)
141 .map_err(Error::from)
142 });
143 let mut abort_future = abort_future
144 .map(|_| Err(format_err!("task aborted")));
145
146 async move {
147 let res = select!{
148 req = req_fut => req,
149 abrt = abort_future => abrt,
150 };
151
152 match (res, env.ensure_finished()) {
153 (Ok(_), Ok(())) => {
154 env.log("backup finished successfully");
155 Ok(())
156 },
157 (Err(err), Ok(())) => {
158 // ignore errors after finish
159 env.log(format!("backup had errors but finished: {}", err));
160 Ok(())
161 },
162 (Ok(_), Err(err)) => {
163 env.log(format!("backup ended and finish failed: {}", err));
164 env.log("removing unfinished backup");
165 env.remove_backup()?;
166 Err(err)
167 },
168 (Err(err), Err(_)) => {
169 env.log(format!("backup failed: {}", err));
170 env.log("removing failed backup");
171 env.remove_backup()?;
172 Err(err)
173 },
174 }
175 }
176 })?;
177
178 let response = Response::builder()
179 .status(StatusCode::SWITCHING_PROTOCOLS)
180 .header(UPGRADE, HeaderValue::from_static(PROXMOX_BACKUP_PROTOCOL_ID_V1!()))
181 .body(Body::empty())?;
182
183 Ok(response)
184 }.boxed()
185 }
186
187 pub const BACKUP_API_SUBDIRS: SubdirMap = &[
188 (
189 "blob", &Router::new()
190 .upload(&API_METHOD_UPLOAD_BLOB)
191 ),
192 (
193 "dynamic_chunk", &Router::new()
194 .upload(&API_METHOD_UPLOAD_DYNAMIC_CHUNK)
195 ),
196 (
197 "dynamic_close", &Router::new()
198 .post(&API_METHOD_CLOSE_DYNAMIC_INDEX)
199 ),
200 (
201 "dynamic_index", &Router::new()
202 .download(&API_METHOD_DYNAMIC_CHUNK_INDEX)
203 .post(&API_METHOD_CREATE_DYNAMIC_INDEX)
204 .put(&API_METHOD_DYNAMIC_APPEND)
205 ),
206 (
207 "finish", &Router::new()
208 .post(
209 &ApiMethod::new(
210 &ApiHandler::Sync(&finish_backup),
211 &ObjectSchema::new("Mark backup as finished.", &[])
212 )
213 )
214 ),
215 (
216 "fixed_chunk", &Router::new()
217 .upload(&API_METHOD_UPLOAD_FIXED_CHUNK)
218 ),
219 (
220 "fixed_close", &Router::new()
221 .post(&API_METHOD_CLOSE_FIXED_INDEX)
222 ),
223 (
224 "fixed_index", &Router::new()
225 .download(&API_METHOD_FIXED_CHUNK_INDEX)
226 .post(&API_METHOD_CREATE_FIXED_INDEX)
227 .put(&API_METHOD_FIXED_APPEND)
228 ),
229 (
230 "speedtest", &Router::new()
231 .upload(&API_METHOD_UPLOAD_SPEEDTEST)
232 ),
233 ];
234
235 pub const BACKUP_API_ROUTER: Router = Router::new()
236 .get(&list_subdirs_api_method!(BACKUP_API_SUBDIRS))
237 .subdirs(BACKUP_API_SUBDIRS);
238
239 #[sortable]
240 pub const API_METHOD_CREATE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
241 &ApiHandler::Sync(&create_dynamic_index),
242 &ObjectSchema::new(
243 "Create dynamic chunk index file.",
244 &sorted!([
245 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
246 ]),
247 )
248 );
249
250 fn create_dynamic_index(
251 param: Value,
252 _info: &ApiMethod,
253 rpcenv: &mut dyn RpcEnvironment,
254 ) -> Result<Value, Error> {
255
256 let env: &BackupEnvironment = rpcenv.as_ref();
257
258 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
259
260 let archive_name = name.clone();
261 if !archive_name.ends_with(".didx") {
262 bail!("wrong archive extension: '{}'", archive_name);
263 }
264
265 let mut path = env.backup_dir.relative_path();
266 path.push(archive_name);
267
268 let index = env.datastore.create_dynamic_writer(&path)?;
269 let wid = env.register_dynamic_writer(index, name)?;
270
271 env.log(format!("created new dynamic index {} ({:?})", wid, path));
272
273 Ok(json!(wid))
274 }
275
276 #[sortable]
277 pub const API_METHOD_CREATE_FIXED_INDEX: ApiMethod = ApiMethod::new(
278 &ApiHandler::Sync(&create_fixed_index),
279 &ObjectSchema::new(
280 "Create fixed chunk index file.",
281 &sorted!([
282 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA),
283 ("size", false, &IntegerSchema::new("File size.")
284 .minimum(1)
285 .schema()
286 ),
287 ]),
288 )
289 );
290
291 fn create_fixed_index(
292 param: Value,
293 _info: &ApiMethod,
294 rpcenv: &mut dyn RpcEnvironment,
295 ) -> Result<Value, Error> {
296
297 let env: &BackupEnvironment = rpcenv.as_ref();
298
299 println!("PARAM: {:?}", param);
300
301 let name = tools::required_string_param(&param, "archive-name")?.to_owned();
302 let size = tools::required_integer_param(&param, "size")? as usize;
303
304 let archive_name = name.clone();
305 if !archive_name.ends_with(".fidx") {
306 bail!("wrong archive extension: '{}'", archive_name);
307 }
308
309 let mut path = env.backup_dir.relative_path();
310 path.push(archive_name);
311
312 let chunk_size = 4096*1024; // todo: ??
313
314 let index = env.datastore.create_fixed_writer(&path, size, chunk_size)?;
315 let wid = env.register_fixed_writer(index, name, size, chunk_size as u32)?;
316
317 env.log(format!("created new fixed index {} ({:?})", wid, path));
318
319 Ok(json!(wid))
320 }
321
322 #[sortable]
323 pub const API_METHOD_DYNAMIC_APPEND: ApiMethod = ApiMethod::new(
324 &ApiHandler::Sync(&dynamic_append),
325 &ObjectSchema::new(
326 "Append chunk to dynamic index writer.",
327 &sorted!([
328 (
329 "wid",
330 false,
331 &IntegerSchema::new("Dynamic writer ID.")
332 .minimum(1)
333 .maximum(256)
334 .schema()
335 ),
336 (
337 "digest-list",
338 false,
339 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
340 ),
341 (
342 "offset-list",
343 false,
344 &ArraySchema::new(
345 "Chunk offset list.",
346 &IntegerSchema::new("Corresponding chunk offsets.")
347 .minimum(0)
348 .schema()
349 ).schema()
350 ),
351 ]),
352 )
353 );
354
355 fn dynamic_append (
356 param: Value,
357 _info: &ApiMethod,
358 rpcenv: &mut dyn RpcEnvironment,
359 ) -> Result<Value, Error> {
360
361 let wid = tools::required_integer_param(&param, "wid")? as usize;
362 let digest_list = tools::required_array_param(&param, "digest-list")?;
363 let offset_list = tools::required_array_param(&param, "offset-list")?;
364
365 if offset_list.len() != digest_list.len() {
366 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
367 }
368
369 let env: &BackupEnvironment = rpcenv.as_ref();
370
371 env.debug(format!("dynamic_append {} chunks", digest_list.len()));
372
373 for (i, item) in digest_list.iter().enumerate() {
374 let digest_str = item.as_str().unwrap();
375 let digest = proxmox::tools::hex_to_digest(digest_str)?;
376 let offset = offset_list[i].as_u64().unwrap();
377 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
378
379 env.dynamic_writer_append_chunk(wid, offset, size, &digest)?;
380
381 env.debug(format!("successfully added chunk {} to dynamic index {} (offset {}, size {})", digest_str, wid, offset, size));
382 }
383
384 Ok(Value::Null)
385 }
386
387 #[sortable]
388 pub const API_METHOD_FIXED_APPEND: ApiMethod = ApiMethod::new(
389 &ApiHandler::Sync(&fixed_append),
390 &ObjectSchema::new(
391 "Append chunk to fixed index writer.",
392 &sorted!([
393 (
394 "wid",
395 false,
396 &IntegerSchema::new("Fixed writer ID.")
397 .minimum(1)
398 .maximum(256)
399 .schema()
400 ),
401 (
402 "digest-list",
403 false,
404 &ArraySchema::new("Chunk digest list.", &CHUNK_DIGEST_SCHEMA).schema()
405 ),
406 (
407 "offset-list",
408 false,
409 &ArraySchema::new(
410 "Chunk offset list.",
411 &IntegerSchema::new("Corresponding chunk offsets.")
412 .minimum(0)
413 .schema()
414 ).schema()
415 )
416 ]),
417 )
418 );
419
420 fn fixed_append (
421 param: Value,
422 _info: &ApiMethod,
423 rpcenv: &mut dyn RpcEnvironment,
424 ) -> Result<Value, Error> {
425
426 let wid = tools::required_integer_param(&param, "wid")? as usize;
427 let digest_list = tools::required_array_param(&param, "digest-list")?;
428 let offset_list = tools::required_array_param(&param, "offset-list")?;
429
430 if offset_list.len() != digest_list.len() {
431 bail!("offset list has wrong length ({} != {})", offset_list.len(), digest_list.len());
432 }
433
434 let env: &BackupEnvironment = rpcenv.as_ref();
435
436 env.debug(format!("fixed_append {} chunks", digest_list.len()));
437
438 for (i, item) in digest_list.iter().enumerate() {
439 let digest_str = item.as_str().unwrap();
440 let digest = proxmox::tools::hex_to_digest(digest_str)?;
441 let offset = offset_list[i].as_u64().unwrap();
442 let size = env.lookup_chunk(&digest).ok_or_else(|| format_err!("no such chunk {}", digest_str))?;
443
444 env.fixed_writer_append_chunk(wid, offset, size, &digest)?;
445
446 env.debug(format!("successfully added chunk {} to fixed index {} (offset {}, size {})", digest_str, wid, offset, size));
447 }
448
449 Ok(Value::Null)
450 }
451
452 #[sortable]
453 pub const API_METHOD_CLOSE_DYNAMIC_INDEX: ApiMethod = ApiMethod::new(
454 &ApiHandler::Sync(&close_dynamic_index),
455 &ObjectSchema::new(
456 "Close dynamic index writer.",
457 &sorted!([
458 (
459 "wid",
460 false,
461 &IntegerSchema::new("Dynamic writer ID.")
462 .minimum(1)
463 .maximum(256)
464 .schema()
465 ),
466 (
467 "chunk-count",
468 false,
469 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
470 .minimum(1)
471 .schema()
472 ),
473 (
474 "size",
475 false,
476 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
477 .minimum(1)
478 .schema()
479 ),
480 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
481 ]),
482 )
483 );
484
485 fn close_dynamic_index (
486 param: Value,
487 _info: &ApiMethod,
488 rpcenv: &mut dyn RpcEnvironment,
489 ) -> Result<Value, Error> {
490
491 let wid = tools::required_integer_param(&param, "wid")? as usize;
492 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
493 let size = tools::required_integer_param(&param, "size")? as u64;
494 let csum_str = tools::required_string_param(&param, "csum")?;
495 let csum = proxmox::tools::hex_to_digest(csum_str)?;
496
497 let env: &BackupEnvironment = rpcenv.as_ref();
498
499 env.dynamic_writer_close(wid, chunk_count, size, csum)?;
500
501 env.log(format!("successfully closed dynamic index {}", wid));
502
503 Ok(Value::Null)
504 }
505
506 #[sortable]
507 pub const API_METHOD_CLOSE_FIXED_INDEX: ApiMethod = ApiMethod::new(
508 &ApiHandler::Sync(&close_fixed_index),
509 &ObjectSchema::new(
510 "Close fixed index writer.",
511 &sorted!([
512 (
513 "wid",
514 false,
515 &IntegerSchema::new("Fixed writer ID.")
516 .minimum(1)
517 .maximum(256)
518 .schema()
519 ),
520 (
521 "chunk-count",
522 false,
523 &IntegerSchema::new("Chunk count. This is used to verify that the server got all chunks.")
524 .minimum(1)
525 .schema()
526 ),
527 (
528 "size",
529 false,
530 &IntegerSchema::new("File size. This is used to verify that the server got all data.")
531 .minimum(1)
532 .schema()
533 ),
534 ("csum", false, &StringSchema::new("Digest list checksum.").schema()),
535 ]),
536 )
537 );
538
539 fn close_fixed_index (
540 param: Value,
541 _info: &ApiMethod,
542 rpcenv: &mut dyn RpcEnvironment,
543 ) -> Result<Value, Error> {
544
545 let wid = tools::required_integer_param(&param, "wid")? as usize;
546 let chunk_count = tools::required_integer_param(&param, "chunk-count")? as u64;
547 let size = tools::required_integer_param(&param, "size")? as u64;
548 let csum_str = tools::required_string_param(&param, "csum")?;
549 let csum = proxmox::tools::hex_to_digest(csum_str)?;
550
551 let env: &BackupEnvironment = rpcenv.as_ref();
552
553 env.fixed_writer_close(wid, chunk_count, size, csum)?;
554
555 env.log(format!("successfully closed fixed index {}", wid));
556
557 Ok(Value::Null)
558 }
559
560 fn finish_backup (
561 _param: Value,
562 _info: &ApiMethod,
563 rpcenv: &mut dyn RpcEnvironment,
564 ) -> Result<Value, Error> {
565
566 let env: &BackupEnvironment = rpcenv.as_ref();
567
568 env.finish_backup()?;
569 env.log("successfully finished backup");
570
571 Ok(Value::Null)
572 }
573
574 #[sortable]
575 pub const API_METHOD_DYNAMIC_CHUNK_INDEX: ApiMethod = ApiMethod::new(
576 &ApiHandler::AsyncHttp(&dynamic_chunk_index),
577 &ObjectSchema::new(
578 r###"
579 Download the dynamic chunk index from the previous backup.
580 Simply returns an empty list if this is the first backup.
581 "### ,
582 &sorted!([
583 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
584 ]),
585 )
586 );
587
588 fn dynamic_chunk_index(
589 _parts: Parts,
590 _req_body: Body,
591 param: Value,
592 _info: &ApiMethod,
593 rpcenv: Box<dyn RpcEnvironment>,
594 ) -> ApiResponseFuture {
595
596 async move {
597 let env: &BackupEnvironment = rpcenv.as_ref();
598
599 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
600
601 if !archive_name.ends_with(".didx") {
602 bail!("wrong archive extension: '{}'", archive_name);
603 }
604
605 let empty_response = {
606 Response::builder()
607 .status(StatusCode::OK)
608 .body(Body::empty())?
609 };
610
611 let last_backup = match &env.last_backup {
612 Some(info) => info,
613 None => return Ok(empty_response),
614 };
615
616 let mut path = last_backup.backup_dir.relative_path();
617 path.push(&archive_name);
618
619 let index = match env.datastore.open_dynamic_reader(path) {
620 Ok(index) => index,
621 Err(_) => {
622 env.log(format!("there is no last backup for archive '{}'", archive_name));
623 return Ok(empty_response);
624 }
625 };
626
627 env.log(format!("download last backup index for archive '{}'", archive_name));
628
629 let count = index.index_count();
630 for pos in 0..count {
631 let info = index.chunk_info(pos)?;
632 let size = info.size() as u32;
633 env.register_chunk(info.digest, size)?;
634 }
635
636 let reader = DigestListEncoder::new(Box::new(index));
637
638 let stream = WrappedReaderStream::new(reader);
639
640 // fixme: set size, content type?
641 let response = http::Response::builder()
642 .status(200)
643 .body(Body::wrap_stream(stream))?;
644
645 Ok(response)
646 }.boxed()
647 }
648
649 #[sortable]
650 pub const API_METHOD_FIXED_CHUNK_INDEX: ApiMethod = ApiMethod::new(
651 &ApiHandler::AsyncHttp(&fixed_chunk_index),
652 &ObjectSchema::new(
653 r###"
654 Download the fixed chunk index from the previous backup.
655 Simply returns an empty list if this is the first backup.
656 "### ,
657 &sorted!([
658 ("archive-name", false, &crate::api2::types::BACKUP_ARCHIVE_NAME_SCHEMA)
659 ]),
660 )
661 );
662
663 fn fixed_chunk_index(
664 _parts: Parts,
665 _req_body: Body,
666 param: Value,
667 _info: &ApiMethod,
668 rpcenv: Box<dyn RpcEnvironment>,
669 ) -> ApiResponseFuture {
670
671 async move {
672 let env: &BackupEnvironment = rpcenv.as_ref();
673
674 let archive_name = tools::required_string_param(&param, "archive-name")?.to_owned();
675
676 if !archive_name.ends_with(".fidx") {
677 bail!("wrong archive extension: '{}'", archive_name);
678 }
679
680 let empty_response = {
681 Response::builder()
682 .status(StatusCode::OK)
683 .body(Body::empty())?
684 };
685
686 let last_backup = match &env.last_backup {
687 Some(info) => info,
688 None => return Ok(empty_response),
689 };
690
691 let mut path = last_backup.backup_dir.relative_path();
692 path.push(&archive_name);
693
694 let index = match env.datastore.open_fixed_reader(path) {
695 Ok(index) => index,
696 Err(_) => {
697 env.log(format!("there is no last backup for archive '{}'", archive_name));
698 return Ok(empty_response);
699 }
700 };
701
702 env.log(format!("download last backup index for archive '{}'", archive_name));
703
704 let count = index.index_count();
705 let image_size = index.index_bytes();
706 for pos in 0..count {
707 let digest = index.index_digest(pos).unwrap();
708 // Note: last chunk can be smaller
709 let start = (pos*index.chunk_size) as u64;
710 let mut end = start + index.chunk_size as u64;
711 if end > image_size { end = image_size; }
712 let size = (end - start) as u32;
713 env.register_chunk(*digest, size)?;
714 }
715
716 let reader = DigestListEncoder::new(Box::new(index));
717
718 let stream = WrappedReaderStream::new(reader);
719
720 // fixme: set size, content type?
721 let response = http::Response::builder()
722 .status(200)
723 .body(Body::wrap_stream(stream))?;
724
725 Ok(response)
726 }.boxed()
727 }