import { Output } from '../../base/outputs';
import { Observable, Subscriber } from 'rxjs';
import { QueueItemInput } from '../dtos/queue-item.input';
import { QueueItemOutput } from '../dtos/queue-item.output';
import { QueueItemStatus } from '../../signatures/enums/queue-item-status';
import { QueueServiceInterface } from '../interfaces/queue.service.interface';

export class QueueService<O extends Output> implements QueueServiceInterface<O> {
  private _subscriber!: Subscriber<QueueItemOutput<O>>;
  private _queue!: (QueueItemInput<O> & { output: QueueItemOutput<O> })[];

  handle(input: QueueItemInput<O>[]): Observable<QueueItemOutput<O>> {
    return new Observable<QueueItemOutput<O>>((subscriber: Subscriber<QueueItemOutput<O>>) => {
      this._subscriber = subscriber;
      this._queue = input.map((item) => ({
        ...item,
        output: new QueueItemOutput<O>({
          status: QueueItemStatus.Waiting,
          identifier: item.identifier,
        }),
      }));

      this.startQueue().then();
    });
  }

  private async startQueue(): Promise<void> {
    this._queue.forEach((item) => this._subscriber.next(item.output));

    for (const queueItem of this._queue) {
      queueItem.output.status = QueueItemStatus.Executing;
      this._subscriber.next(queueItem.output);

      queueItem.output.data = await queueItem.handleExecution();
      queueItem.output.status = queueItem.output.data.hasErrors ? QueueItemStatus.Error : QueueItemStatus.Success;

      this._subscriber.next(queueItem.output);
    }

    this._subscriber.complete();
  }
}
