import { Injectable } from '@angular/core';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
+import { filter } from 'rxjs/operators';
import { cdEncode, cdEncodeNot } from '../decorators/cd-encode';
+import { MirroringSummary } from '../models/mirroring-summary';
import { TimerService } from '../services/timer.service';
import { ApiModule } from './api.module';
export class RbdMirroringService {
readonly REFRESH_INTERVAL = 30000;
// Observable sources
- private summaryDataSource = new BehaviorSubject(null);
+ private summaryDataSource = new BehaviorSubject<MirroringSummary>(null);
// Observable streams
summaryData$ = this.summaryDataSource.asObservable();
return this.retrieveSummaryObservable().subscribe(this.retrieveSummaryObserver());
}
- private retrieveSummaryObservable(): Observable<Object> {
+ private retrieveSummaryObservable(): Observable<MirroringSummary> {
return this.http.get('api/block/mirroring/summary');
}
- private retrieveSummaryObserver(): (data: any) => void {
+ private retrieveSummaryObserver(): (data: MirroringSummary) => void {
return (data: any) => {
this.summaryDataSource.next(data);
};
}
- /**
- * Returns the current value of summaryData
- */
- getCurrentSummary(): { [key: string]: any; executing_tasks: object[] } {
- return this.summaryDataSource.getValue();
- }
-
/**
* Subscribes to the summaryData,
* which is updated periodically or when a new task is created.
*/
- subscribeSummary(next: (summary: any) => void, error?: (error: any) => void): Subscription {
- return this.summaryData$.subscribe(next, error);
+ subscribeSummary(
+ next: (summary: MirroringSummary) => void,
+ error?: (error: any) => void
+ ): Subscription {
+ return this.summaryData$.pipe(filter((value) => !!value)).subscribe(next, error);
}
getPool(poolName: string) {