import { Injectable, OnDestroy, Inject } from '@angular/core';
import { Observable, SubscriptionLike, Subject, Observer, interval, BehaviorSubject } from 'rxjs';
import { filter, map } from 'rxjs/operators';
import { WebSocketSubject, WebSocketSubjectConfig } from 'rxjs/webSocket';

import { share, distinctUntilChanged, takeWhile } from 'rxjs/operators';
import { Helper, Guid } from '../classes';
import { HttpService } from './http.service';

export interface IWebsocketService {
  on<T>(event: string): Observable<T>;
  send(event: string, data: any): void;
  status: Observable<boolean>;
}

export interface IWsMessage<T> {
  event: string;
  data: T;
}
export class WebSocketConfig {
  url: string;
  reconnectInterval?: number;
  reconnectAttempts?: number;
}

export class WebsocketService implements IWebsocketService, OnDestroy {

    private config: WebSocketSubjectConfig<IWsMessage<any>>;

    private websocketSub: SubscriptionLike;
    private statusSub: SubscriptionLike;

    private reconnection$: Observable<number>;
    private websocket$: WebSocketSubject<IWsMessage<any>>;
    private connection$: Observer<boolean>;
    private wsMessages$: Subject<IWsMessage<any>>;

    private reconnectInterval: number;
    private reconnectAttempts: number;
    private isConnected: boolean;


    public status: Observable<boolean>;

    constructor(wsConfig: WebSocketConfig) {
        this.wsMessages$ = new Subject<IWsMessage<any>>();

        this.reconnectInterval = wsConfig.reconnectInterval || 5000; // pause between connections
        this.reconnectAttempts = wsConfig.reconnectAttempts || 10; // number of connection attempts

        this.config = {
            url: wsConfig.url,
            closeObserver: {
                next: (event: CloseEvent) => {
                    this.websocket$ = null;
                    this.connection$.next(false);
                }
            },
            openObserver: {
                next: (event: Event) => {
                    console.log('WebSocket connected!');
                    this.connection$.next(true);
                }
            }
        };

        // connection status
        this.status = new Observable<boolean>((observer) => {            1
            this.connection$ = observer;
        }).pipe(share(), distinctUntilChanged());

        // run reconnect if not connection
        this.statusSub = this.status
            .subscribe((isConnected) => {
                this.isConnected = isConnected;

                if (!this.reconnection$ && typeof(isConnected) === 'boolean' && !isConnected) {
                    this.reconnect();
                }
            });

      this.websocketSub = this.wsMessages$.subscribe(
        (mes: any) => {
       //   console.log(mes)
        }, (error: ErrorEvent) => console.error('WebSocket error!', error)
        );

        this.connect();
    }

    ngOnDestroy() {
        this.websocketSub.unsubscribe();
        this.statusSub.unsubscribe();
    }
    /*
    * connect to WebSocked
    * */
    private connect(): void {
        this.websocket$ = new WebSocketSubject(this.config);

        this.websocket$.subscribe(
          (message) => {
            if (message.event.includes('Added') || message.event.includes('Deleted') || message.event.includes('ContextObject')) {
              console.log('ws message ' + message.event + ' data ');
              console.log(message.data);
            }
          this.wsMessages$.next(message); },
            (error: Event) => {
                if (!this.websocket$) {
                    // run reconnect if errors
                    this.reconnect();
                }
            });
    }


    public refreshConnect(wsConfig: WebSocketConfig): void {
      this.config = wsConfig;
      this.connect();
    }
    /*
    * reconnect if not connecting or errors
    * */
    private reconnect(): void {
        this.reconnection$ = interval(this.reconnectInterval)
            .pipe(takeWhile((v, index) => index < this.reconnectAttempts && !this.websocket$));

        this.reconnection$.subscribe(
            () => this.connect(),
            null,
            () => {
                // Subject complete if reconnect attemts ending
                this.reconnection$ = null;

                if (!this.websocket$) {
                    this.wsMessages$.complete();
                    this.connection$.complete();
                }
            });
    }


    /*
    * on message event
    * */
    public on<T>(event: string): Observable<T> {

      if (event) {
        let mes=this.wsMessages$.pipe(

          filter((message: IWsMessage<T>) => message.event === event),
          map((message: IWsMessage<T>) => message.data)
          );
        return mes;
      }
     
    }


    /*
    * on message to server
    * */
    public send(event: string, data: any = {}): void {
        if (event && this.isConnected) {
            this.websocket$.next(<any>JSON.stringify({ event, data }));
        } else {
            console.error('Send error!');
        }
    }

  public id: string;

  static getwsurl() {
    let http = Helper.webApiUrl();
    let prot = http.substring(0, 5);
    let wsurl = (prot == 'https') ? 'wss' + http.substring(5) + 'ws' : 'ws' + http.substring(4) + 'ws';
    console.log(wsurl);
    return wsurl;
  }
  private static loading = false;
  private static wsServiceSubject = new BehaviorSubject<WebsocketService>(null);
  public static Instance() {
    let ds = new Observable<WebsocketService>(sub => {
      WebsocketService.wsServiceSubject.subscribe(ws => {
        if (ws)
          sub.next(ws);
      });
      if (WebsocketService.loading)
        return;
      WebsocketService.loading = true;
      HttpService.GetToken()
      .then(Token => {
        var config = new WebSocketConfig();
        config.url = WebsocketService.getwsurl() + "?Bearer=" + Token;
        let wsService = new WebsocketService(config);
        wsService.id = Guid.NewGuid().ToString();
        WebsocketService.wsServiceSubject.next(wsService);
      });
    });
    return ds;
  }
}
