import { HttpClient, HttpHeaders } from '@angular/common/http';
import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { map, publishLast, refCount, share, shareReplay, switchMap } from 'rxjs/operators';

import Centrifuge from '@common/centrifuge';
import { AppConfigService } from '@core';
import { ApiService } from '@modules/api';

import { CentrifugoMsg } from '../../data/centrifugo-msg';

@Injectable({
  providedIn: 'root'
})
export class CentrifugoService {
  private centrifuge: Centrifuge;
  private channelObservables: { [channel: string]: Observable<CentrifugoMsg> } = {};

  constructor(private appConfigService: AppConfigService, private apiService: ApiService, private http: HttpClient) {}

  getCentrifuge(): Centrifuge {
    if (this.centrifuge) {
      return this.centrifuge;
    }

    this.centrifuge = new Centrifuge(this.appConfigService.centrifugoUrl, {
      debug: !this.appConfigService.production,
      subscribeEndpoint: this.apiService.methodURL('centrifugo/auth/'),
      subscribeHeaders: this.getHeaders()
    });

    this.connect();

    return this.centrifuge;
  }

  getHeaders() {
    const result = {};
    let headers = new HttpHeaders();

    headers = this.apiService.setHeadersToken(headers);
    headers.keys().forEach(key => {
      result[key] = headers.get(key);
    });

    return result;
  }

  refreshHeaders() {
    if (this.centrifuge) {
      this.centrifuge.setSubscribeHeaders(this.getHeaders());
    }
  }

  public connect() {
    this.getUserToken().subscribe(token => {
      this.centrifuge.setToken(token);
      this.centrifuge.connect();
    });
  }

  public disconnect() {
    if (this.centrifuge) {
      this.centrifuge.disconnect();
    }
  }

  public subscribe(channel: string): Observable<CentrifugoMsg> {
    this.refreshHeaders();

    if (!this.channelObservables[channel]) {
      const subscribeObs = Observable.create(observer => {
        const subscription = this.getCentrifuge().subscribe(channel, message => observer.next(message));
        return () => {
          subscription.unsubscribe();
          delete this.channelObservables[channel];
        };
      });

      this.channelObservables[channel] = subscribeObs.pipe(share());
    }

    return this.channelObservables[channel];
  }

  public subscribeState(channel: string): Observable<CentrifugoMsg> {
    this.refreshHeaders();

    if (!this.channelObservables[channel]) {
      const subscribeObs = Observable.create(observer => {
        const subscription = this.getCentrifuge().subscribe(channel, message => {
          observer.next(message);
        });

        subscription.history({ limit: 1 }).then(ctx => {
          const lastPublication = ctx.publications[0];
          if (lastPublication) {
            observer.next(lastPublication);
          }
        });

        return () => {
          subscription.unsubscribe();
          delete this.channelObservables[channel];
        };
      });

      this.channelObservables[channel] = subscribeObs.pipe(shareReplay(1));
    }

    return this.channelObservables[channel];
  }

  private getUserToken(): Observable<string> {
    return this.apiService.refreshToken().pipe(
      switchMap(() => {
        const url = this.apiService.methodURL('centrifugo/token/');
        let headers = new HttpHeaders();

        headers = this.apiService.setHeadersToken(headers);

        return this.http.get(url, { headers: headers });
      }),
      map(result => result['token']),
      this.apiService.catchApiError(),
      publishLast(),
      refCount()
    );
  }
}
