import { HttpClient, HttpHeaders } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { APIData } from 'common-utils/dist/models/api';
import { isNull } from 'common-utils/dist/typescript-utils';
import * as _ from 'lodash';
import * as moment from 'moment';
import { IMqttServiceOptions, MqttConnectionState, MqttService } from 'ngx-mqtt';
import { CookieService } from 'ngx-shared-services';
import { MessageService } from 'primeng/api';
import { Observable, iif, merge, never, of, throwError, timer } from 'rxjs';
import {
  catchError,
  concatMap,
  distinctUntilChanged,
  expand,
  map,
  retryWhen,
  shareReplay,
  switchMap,
  tap,
  withLatestFrom
} from 'rxjs/operators';
import { Connection, ConnectionRenew, compare as compareConnections } from '../models/connection';
import { EventNode } from '../models/event-node';
import { NodePerformanceIot } from '../models/iot';
import { NodePerformance, NodePerformanceAPI, parse } from '../models/node-performance';
import { ApiEndpointsService } from './api-endpoints.service';

const DEBUG = false;

@Injectable({providedIn: 'root'})
export class PerformanceApiService{

  constructor(
    private http: HttpClient,
    private apiService: ApiEndpointsService,
    private cookieService: CookieService,
    private toaster: MessageService,
    private mqttService: MqttService,
  ) {
  }


  getPerformanceData(node: EventNode): Observable<NodePerformance[]> {
    const {baseUrl} = this.apiService;
    const {emsUrl} = this.apiService;
    const session = this.cookieService.getCookie('enoc_session');
    const headers = new HttpHeaders().append('enoc_session', session);
    let perfValues: NodePerformance[] = [];

    if (isNull(node.event.event_start) || isNull(node.event.event_end)) {
      return never();
    }

    const startDateTime = moment(node.event_node_start.fullDate).subtract(node.pre_event_buffer, 'milliseconds'); // This should not be hardcoded to 2 hours
    const endDateTime: moment.Moment = moment(node.event_node_end.fullDate).add(node.post_event_buffer, 'milliseconds'); // This should not be hardcoded to 2 hours

    let frequency = 300000;
    if (node.event.product && node.event.product.reporting_interval_ms != null && node.event.product.reporting_interval_ms > 0) {
      frequency = Math.max(node.event.product.reporting_interval_ms, 60000);
    }
    const frequencyInMinutes = frequency/60000;

    startDateTime.startOf('minute').minute(frequencyInMinutes * Math.round(startDateTime.minute() / frequencyInMinutes)); // TODO think of a better way for this. We should not need to round to the nearest X minute to show intervals
    endDateTime.startOf('minute').minute(frequencyInMinutes * Math.round(endDateTime.minute() / frequencyInMinutes));     // TODO think of a better way for this. We should not need to round to the nearest X minute to show intervals

    const intervals = determineIntervals(startDateTime, endDateTime, frequency);

    const init$ = this.http.get<APIData<NodePerformanceAPI[]>>(`${emsUrl}/v1/event_node_performance/${node.site.id}/${node.id}`, {withCredentials: true}).pipe(
      map(({data}) => data),
      map((intervalData) => massageTheIntervals(intervalData, node.event.product)),
      map((nodes) => nodes.map((evn) => parse(evn))),
      map((perf) => gapFilling(perf, intervals, node.event.id, node.site.id, node.id)),
      map((resp) => {
        perfValues = resp;
        return resp;
      }),
      // TODO error handling
      tap((events) => DEBUG && console.log('performance-api | init', events)),
      catchError((err) => { // no need to retry, we're already in a polling loop
        console.log(err);
        this.toaster.add({key: 'connection-error', severity: 'error', sticky: false});
        return of<NodePerformance[]>([]);
      }),
      shareReplay(1),
    );

    const connection$ = this.http.post<APIData<Connection>>(`${emsUrl}/v1/connection`, {ids: [node.id], resource: 'NODE_PERFORMANCE'},{withCredentials: true}).pipe(
      map(({data}) => data),
      tap(() => {
      }, (err) => {
        console.log(err);
        this.toaster.add({key: 'connection-error', severity: 'error', sticky: false});
      }), // duck and retry instead of catching the error here
      retryWhen((errors) => errors.pipe(concatMap((error, i) => iif(() => i < 3, timer(2000), throwError(error))))), // retry every 2000ms, up to 3 times, then forward any error
      tap((connection) => DEBUG && console.log('performance-api | connection', connection)),
      catchError((err) => { // no need to retry, we're already in a polling loop
        console.log(err);
        this.toaster.add({key: 'connection-error', severity: 'error', sticky: false});
        return of<Connection>();
      }),
      shareReplay(1),
    );

    const renew$ = connection$.pipe(
      switchMap((__connection) => of(__connection).pipe( // switchMap to automatically cancel previous subscription
        expand((_connection) => timer((_connection.expires_seconds * .9) * 1000).pipe( // renew at 90% of ttl
          switchMap(() => this.http.get<APIData<ConnectionRenew>>(`${emsUrl}/connection/renew/${_connection.client_id}`, {withCredentials: true})),
          map(({data}) => data),
          // TODO error handling
          withLatestFrom(connection$),
          map(([renew, connection]) => ({...connection, ...renew} as Connection)),
          tap((connection) => DEBUG && console.log('performance-api | renew', connection)),
        )), // the expand operator above automatically forwards its first value
      )),
      shareReplay(1),
    );

    const pubsub$ = renew$.pipe(
      distinctUntilChanged(compareConnections),
      switchMap((connection) => {
        const match = /(.*?):\/\/(.*?)(\/.*)/.exec(connection.endpoint_url);
        if (!match) return throwError('bad connection');
        const [, protocol, hostname, path] = match;
        if (this.mqttService.state.value === MqttConnectionState.CLOSED) {
          this.mqttService.connect({protocol: (protocol as IMqttServiceOptions['protocol']), hostname, path, port: 443});
        }
        return this.mqttService.observe(connection.topic);
      }),
      map((mqttMessage) => JSON.parse(new TextDecoder('utf-8').decode(mqttMessage.payload)) as NodePerformanceIot),
      // TODO handle errors from MQTT
      map((iotUpdate) => updateNodes(perfValues, iotUpdate)),
      map((perf) => gapFilling(perf, intervals, node.event.id, node.site.id, node.id)),
      map((resp) => {
        perfValues = resp;
        return resp;
      }),
      tap((events) => DEBUG && console.log('performance-api | pubsub', events)),
      shareReplay(1),
    );

    let performances$: Observable<NodePerformance[]>
    if(node.event.product && node.event.product.reporting_interval_ms < 60000){
      performances$ = init$.pipe(
        shareReplay(1),
      );
    } else {
      performances$ = merge(init$, pubsub$).pipe(
        shareReplay(1),
      );
    }

    return performances$;
  }

}

/** This function merges incoming iot performance updates into the performance array stream */
export function updateNodes(performances: NodePerformance[], update: NodePerformanceIot): NodePerformance[] {
  const node = performances.find(({interval}) => new Date(update.interval_dttm_utc).getTime() === interval.time);
  return node && performances.map((perf) => (perf === node) ? parse({...node.model, ...update} as NodePerformanceAPI) : perf) || performances;
}

export function massageTheIntervals(intervalData, product) {
  if(product && product.reporting_interval_ms < 60000) {
    intervalData.forEach((i)=>{
      i.interval_dttm_utc = moment(i.interval_dttm_utc).startOf('minute').toISOString();
    })
    return _.uniqBy(intervalData, 'interval_dttm_utc')
  }
  return intervalData;
}


/**
 *
 * @param performances
 * @param intervals
 * @param eventId
 * @param siteId
 * @param nodeId
 */
export function gapFilling(performances: NodePerformance[], intervals: string[], eventId: string, siteId: string, nodeId: string): NodePerformance[] {
  return intervals.map((interval) => {
    const predicate = performances.find((a) => a.model.interval_dttm_utc == interval);
    if (predicate) {
      return predicate;
    } else {
      return parse({
        event_node_id: nodeId,
        site_id: siteId,
        event_id: eventId,
        interval_dttm_utc: interval,
        interval_dttm_utc_program_locale: 'en_US', // setting it to default for now
        metered_value: null,
        metered_uom: null,
        baseline_value: null,
        baseline_uom: null,
        target_value: null,
        target_uom: null,
        adjusted_baseline_value: null,
      } as NodePerformanceAPI);
    }
  });
}

/**
 *
 * @param startTime
 * @param endTime
 * @param granularity
 */
export function determineIntervals(startTime: moment.Moment, endTime: moment.Moment, granularity: number): string[] {
  const range: string[] = [];

  while (startTime < endTime) {
    range.push(startTime.toISOString());
    startTime.add(granularity, 'milliseconds');
  }

  return range;
}
