import {Injectable, OnDestroy} from '@angular/core';
import {interval, Observable, Subject, Subscription} from "rxjs";
import { HttpClient } from "@angular/common/http";
import {takeUntil} from 'rxjs/operators';
import {JOB_MAX_NO_POLLS, JOB_POLLING_INTERVAL_MS, JOB_TYPE_BATCH_CORRECTION} from "../shared/globals";

export interface JobPollResponse {
    status: boolean;
    message: string;
    processing?: boolean;
    filename?: string;
    data?: { [key: string]: any };
}


@Injectable({
    providedIn: 'root'
})
export class BackgroundJobService implements OnDestroy {
    private activePollRequests$: { [key: string]: {observable: Subject<JobPollResponse>, numPolls: number} } = {};
    private timerRunning: boolean = false;
    private jobPollEndpoint: string = '/api/job-status/';
    private timerSub: Subscription;
    private readonly onDestroy = new Subject<void>();

    constructor(
        private http: HttpClient
    ) {
    }

    /** Adds a new poll for a job and starts the poll timer if not running.
     * All polls stored on activePollRequests to allow the client to lookup which jobs are still pending if multiple
     * have been created in batch
     *
     * returns: Subject which emits true when Job being polled is complete
     */
    createJobPoll(job_id: string, frequency?: number, type?: string): Subject<JobPollResponse> {
        let jobPoll$: Subject<JobPollResponse> = new Subject();
        this.activePollRequests$[job_id] = {observable: jobPoll$, numPolls: 0};
        if (!this.timerRunning) {
            this.startTimer(frequency, type);
        }

        return jobPoll$;
    }

    private stopTimer(): void {
        this.timerRunning = false;
        this.timerSub.unsubscribe();
    }

    /** Starts 5-second timer, which checks all active polls in activePollRequests
     * */
    private startTimer(frequency: number = JOB_POLLING_INTERVAL_MS, type?: string): void {
        this.timerRunning = true;
        const timer$: Observable<number> = interval(frequency);
        this.timerSub = timer$.pipe(takeUntil(this.onDestroy)).pipe().subscribe(() => {
            if (Object.keys(this.activePollRequests$)?.length === 0) {
                this.stopTimer();
            } else {
                for (const [jobID, jobData] of Object.entries(this.activePollRequests$)) {
                    if (jobData.numPolls >= JOB_MAX_NO_POLLS && type !== JOB_TYPE_BATCH_CORRECTION) {   // ie greater two minutes
                        const message = 'Job timed out';
                        this.nextAndComplete(jobID, jobData.observable, {
                            status: false,
                            message: message
                        });
                    }
                    this.http.get(this.jobPollEndpoint + jobID).subscribe({
                        next: (resp: JobPollResponse) => {
                            if (resp.status) {
                                const message = resp.message || 'Job completed successfully';
                                this.nextAndComplete(jobID, jobData.observable, {
                                    status: true,
                                    message: message,
                                    data: resp.data,
                                    filename: resp.filename
                                });
                            } else {
                                jobData.observable.next({status: false, processing: true, message: resp.message});
                                jobData.numPolls += 1;
                            }
                        }, error: (err) => {
                            console.log('Error: ', err);
                            this.nextAndComplete(jobID, jobData.observable, {
                                status: false,
                                message: err.error?.error
                            });
                        }
                    });
                }
            }
        });
    }

    nextAndComplete(job_id, jobObs$, emitValue) {
        jobObs$.next(emitValue);
        jobObs$.complete();
        delete this.activePollRequests$[job_id];
    }

    ngOnDestroy(): void {
        this.onDestroy.next();
        this.onDestroy.unsubscribe();
    }
}
