]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | import { HttpClient } from '@angular/common/http'; |
1911f103 | 2 | import { Injectable } from '@angular/core'; |
11fdf7f2 | 3 | |
f67539c2 | 4 | import _ from 'lodash'; |
1911f103 | 5 | import { BehaviorSubject, Observable, Subscription } from 'rxjs'; |
f6b5b4d7 | 6 | import { filter, first } from 'rxjs/operators'; |
11fdf7f2 TL |
7 | |
8 | import { ExecutingTask } from '../models/executing-task'; | |
f6b5b4d7 | 9 | import { Summary } from '../models/summary.model'; |
1911f103 | 10 | import { TimerService } from './timer.service'; |
11fdf7f2 TL |
11 | |
12 | @Injectable({ | |
81eedcae | 13 | providedIn: 'root' |
11fdf7f2 TL |
14 | }) |
15 | export class SummaryService { | |
1911f103 | 16 | readonly REFRESH_INTERVAL = 5000; |
11fdf7f2 | 17 | // Observable sources |
f6b5b4d7 | 18 | private summaryDataSource = new BehaviorSubject<Summary>(null); |
11fdf7f2 TL |
19 | // Observable streams |
20 | summaryData$ = this.summaryDataSource.asObservable(); | |
21 | ||
1911f103 | 22 | constructor(private http: HttpClient, private timerService: TimerService) {} |
11fdf7f2 | 23 | |
1911f103 TL |
24 | startPolling(): Subscription { |
25 | return this.timerService | |
26 | .get(() => this.retrieveSummaryObservable(), this.REFRESH_INTERVAL) | |
27 | .subscribe(this.retrieveSummaryObserver()); | |
11fdf7f2 TL |
28 | } |
29 | ||
1911f103 TL |
30 | refresh(): Subscription { |
31 | return this.retrieveSummaryObservable().subscribe(this.retrieveSummaryObserver()); | |
32 | } | |
11fdf7f2 | 33 | |
f6b5b4d7 TL |
34 | private retrieveSummaryObservable(): Observable<Summary> { |
35 | return this.http.get<Summary>('api/summary'); | |
11fdf7f2 TL |
36 | } |
37 | ||
f6b5b4d7 TL |
38 | private retrieveSummaryObserver(): (data: Summary) => void { |
39 | return (data: Summary) => { | |
1911f103 TL |
40 | this.summaryDataSource.next(data); |
41 | }; | |
11fdf7f2 TL |
42 | } |
43 | ||
44 | /** | |
f6b5b4d7 | 45 | * Subscribes to the summaryData and receive only the first, non undefined, value. |
11fdf7f2 | 46 | */ |
f6b5b4d7 TL |
47 | subscribeOnce(next: (summary: Summary) => void, error?: (error: any) => void): Subscription { |
48 | return this.summaryData$ | |
49 | .pipe( | |
50 | filter((value) => !!value), | |
51 | first() | |
52 | ) | |
53 | .subscribe(next, error); | |
11fdf7f2 TL |
54 | } |
55 | ||
56 | /** | |
57 | * Subscribes to the summaryData, | |
1911f103 | 58 | * which is updated periodically or when a new task is created. |
f6b5b4d7 | 59 | * Will receive only non undefined values. |
11fdf7f2 | 60 | */ |
f6b5b4d7 TL |
61 | subscribe(next: (summary: Summary) => void, error?: (error: any) => void): Subscription { |
62 | return this.summaryData$.pipe(filter((value) => !!value)).subscribe(next, error); | |
11fdf7f2 TL |
63 | } |
64 | ||
65 | /** | |
66 | * Inserts a newly created task to the local list of executing tasks. | |
67 | * After that, it will automatically push that new information | |
68 | * to all subscribers. | |
69 | */ | |
70 | addRunningTask(task: ExecutingTask) { | |
71 | const current = this.summaryDataSource.getValue(); | |
72 | if (!current) { | |
73 | return; | |
74 | } | |
75 | ||
76 | if (_.isArray(current.executing_tasks)) { | |
9f95a23c | 77 | const exists = current.executing_tasks.find((element: any) => { |
11fdf7f2 TL |
78 | return element.name === task.name && _.isEqual(element.metadata, task.metadata); |
79 | }); | |
80 | if (!exists) { | |
81 | current.executing_tasks.push(task); | |
82 | } | |
83 | } else { | |
84 | current.executing_tasks = [task]; | |
85 | } | |
86 | ||
87 | this.summaryDataSource.next(current); | |
88 | } | |
89 | } |