import * as _ from 'lodash';
import { BehaviorSubject, Observable, Subscription } from 'rxjs';
+import { filter, first } from 'rxjs/operators';
import { ExecutingTask } from '../models/executing-task';
+import { Summary } from '../models/summary.model';
import { TimerService } from './timer.service';
@Injectable({
export class SummaryService {
readonly REFRESH_INTERVAL = 5000;
// Observable sources
- private summaryDataSource = new BehaviorSubject(null);
+ private summaryDataSource = new BehaviorSubject<Summary>(null);
// Observable streams
summaryData$ = this.summaryDataSource.asObservable();
return this.retrieveSummaryObservable().subscribe(this.retrieveSummaryObserver());
}
- private retrieveSummaryObservable(): Observable<Object> {
- return this.http.get('api/summary');
+ private retrieveSummaryObservable(): Observable<Summary> {
+ return this.http.get<Summary>('api/summary');
}
- private retrieveSummaryObserver(): (data: any) => void {
- return (data: Object) => {
+ private retrieveSummaryObserver(): (data: Summary) => void {
+ return (data: Summary) => {
this.summaryDataSource.next(data);
};
}
/**
- * Returns the current value of summaryData
+ * Subscribes to the summaryData and receive only the first, non undefined, value.
*/
- getCurrentSummary(): { [key: string]: any; executing_tasks: object[] } {
- return this.summaryDataSource.getValue();
+ subscribeOnce(next: (summary: Summary) => void, error?: (error: any) => void): Subscription {
+ return this.summaryData$
+ .pipe(
+ filter((value) => !!value),
+ first()
+ )
+ .subscribe(next, error);
}
/**
* Subscribes to the summaryData,
* which is updated periodically or when a new task is created.
+ * Will receive only non undefined values.
*/
- subscribe(next: (summary: any) => void, error?: (error: any) => void): Subscription {
- return this.summaryData$.subscribe(next, error);
+ subscribe(next: (summary: Summary) => void, error?: (error: any) => void): Subscription {
+ return this.summaryData$.pipe(filter((value) => !!value)).subscribe(next, error);
}
/**