]>
Commit | Line | Data |
---|---|---|
1 | import { HttpClient } from '@angular/common/http'; | |
2 | import { Injectable } from '@angular/core'; | |
3 | ||
4 | import _ from 'lodash'; | |
5 | import { BehaviorSubject, Observable, Subscription } from 'rxjs'; | |
6 | import { filter, first } from 'rxjs/operators'; | |
7 | ||
8 | import { ExecutingTask } from '../models/executing-task'; | |
9 | import { Summary } from '../models/summary.model'; | |
10 | import { TimerService } from './timer.service'; | |
11 | ||
12 | @Injectable({ | |
13 | providedIn: 'root' | |
14 | }) | |
15 | export class SummaryService { | |
16 | readonly REFRESH_INTERVAL = 5000; | |
17 | // Observable sources | |
18 | private summaryDataSource = new BehaviorSubject<Summary>(null); | |
19 | // Observable streams | |
20 | summaryData$ = this.summaryDataSource.asObservable(); | |
21 | ||
22 | constructor(private http: HttpClient, private timerService: TimerService) {} | |
23 | ||
24 | startPolling(): Subscription { | |
25 | return this.timerService | |
26 | .get(() => this.retrieveSummaryObservable(), this.REFRESH_INTERVAL) | |
27 | .subscribe(this.retrieveSummaryObserver()); | |
28 | } | |
29 | ||
30 | refresh(): Subscription { | |
31 | return this.retrieveSummaryObservable().subscribe(this.retrieveSummaryObserver()); | |
32 | } | |
33 | ||
34 | private retrieveSummaryObservable(): Observable<Summary> { | |
35 | return this.http.get<Summary>('api/summary'); | |
36 | } | |
37 | ||
38 | private retrieveSummaryObserver(): (data: Summary) => void { | |
39 | return (data: Summary) => { | |
40 | this.summaryDataSource.next(data); | |
41 | }; | |
42 | } | |
43 | ||
44 | /** | |
45 | * Subscribes to the summaryData and receive only the first, non undefined, value. | |
46 | */ | |
47 | subscribeOnce(next: (summary: Summary) => void, error?: (error: any) => void): Subscription { | |
48 | return this.summaryData$ | |
49 | .pipe( | |
50 | filter((value) => !!value), | |
51 | first() | |
52 | ) | |
53 | .subscribe(next, error); | |
54 | } | |
55 | ||
56 | /** | |
57 | * Subscribes to the summaryData, | |
58 | * which is updated periodically or when a new task is created. | |
59 | * Will receive only non undefined values. | |
60 | */ | |
61 | subscribe(next: (summary: Summary) => void, error?: (error: any) => void): Subscription { | |
62 | return this.summaryData$.pipe(filter((value) => !!value)).subscribe(next, error); | |
63 | } | |
64 | ||
65 | /** | |
66 | * Inserts a newly created task to the local list of executing tasks. | |
67 | * After that, it will automatically push that new information | |
68 | * to all subscribers. | |
69 | */ | |
70 | addRunningTask(task: ExecutingTask) { | |
71 | const current = this.summaryDataSource.getValue(); | |
72 | if (!current) { | |
73 | return; | |
74 | } | |
75 | ||
76 | if (_.isArray(current.executing_tasks)) { | |
77 | const exists = current.executing_tasks.find((element: any) => { | |
78 | return element.name === task.name && _.isEqual(element.metadata, task.metadata); | |
79 | }); | |
80 | if (!exists) { | |
81 | current.executing_tasks.push(task); | |
82 | } | |
83 | } else { | |
84 | current.executing_tasks = [task]; | |
85 | } | |
86 | ||
87 | this.summaryDataSource.next(current); | |
88 | } | |
89 | } |