import process from 'node:process';
import mqtt, {
  type Client,
  type IClientOptions,
  type IClientSubscribeOptions,
  type IPublishPacket,
  type OnMessageCallback,
} from 'mqtt';
import { sleep } from '../functions/utils';

const MQTT_BROKER = process.env.MQTT_BROKER || 'mqtt://localhost:1883';
const MQTT_USERNAME = process.env.MQTT_SERVICE_USERNAME;
const MQTT_PASSWORD = process.env.MQTT_SERVICE_PASSWORD;
const DEFAULT_CONNECT_TIMEOUT = 30 * 1000;

interface Logger {
  info(message: string): void;
  error(message: string): void;
}

type OnConnectCallback = () => void;
type OnEndCallback = () => void;
export interface Options extends IClientOptions {
  onConnect?: OnConnectCallback;
  onMessage?: OnMessageCallback;
  onEnd?: OnEndCallback;
  onDissconnect?: CallableFunction;
  logger?: Logger;
  host?: string;
}
const noop = () => {};

export const FAILED_TO_CONNECT_ERROR = new Error('Failed to connect to Racemap MQTT Broker.');

class MQTTClient {
  private _host: string = MQTT_BROKER;
  private _mqttClientOptions: Options;
  private _mqttClient: Client | null = null;
  private _onConnect: OnConnectCallback;
  private _onMessage: OnMessageCallback;
  private _onEnd: OnEndCallback;
  private _onDisconnect: CallableFunction;
  private _logger: Logger | Console | null = null;
  private _isDissconnected: boolean;

  constructor(options: Options = {}) {
    this._mqttClientOptions = options;
    this._onConnect = options.onConnect || noop;
    this._onMessage = options.onMessage || noop;
    this._onEnd = options.onEnd || noop;
    this._onDisconnect = options.onDissconnect || noop;
    this._logger = options.logger || console;
    this._isDissconnected = true;

    if (options.host != null) {
      this._host = options.host;
    }

    if (MQTT_USERNAME != null && MQTT_PASSWORD != null) {
      this._mqttClientOptions.username = MQTT_USERNAME;
      this._mqttClientOptions.password = MQTT_PASSWORD;
    }
  }

  connect = async () => {
    if (this.connected) return;
    this._mqttClient = mqtt.connect(this._host, this._mqttClientOptions);

    this._mqttClient.on('connect', this.handleConnect);
    this._mqttClient.on('message', this.handleMessage);
    this._mqttClient.on('end', this.handleEnd);
    this._mqttClient.on('close', this.handleDisconnect);

    const startTime = Date.now();

    while (!this.connected) {
      if (Date.now() - startTime > this.connectionTimeout) {
        this._mqttClient.end();
        throw FAILED_TO_CONNECT_ERROR;
      }
      await sleep(100);
    }

    return;
  };

  subscribe = async (
    topic: string,
    options: IClientSubscribeOptions = { qos: 0 },
  ): Promise<Array<mqtt.ISubscriptionGrant>> => {
    return new Promise((resolve, reject) => {
      this._mqttClient?.subscribe(topic, options, (err, granted) => {
        if (err != null) {
          this._logger?.error(err.message);
          reject(err);
        }
        if (granted.length === 0) {
          this._logger?.info('Subscripted to topics again.');
        } else {
          this._logger?.info(
            `Subscripted to topics [${granted.map((grant) => grant.topic).join(', ')}]`,
          );
        }
        resolve(granted);
      });
    });
  };

  publish = async (
    topic: string,
    message: Buffer | string,
    options: mqtt.IClientPublishOptions = { qos: 0 },
  ): Promise<mqtt.Packet | undefined> => {
    return new Promise((resolve, reject) => {
      this._mqttClient?.publish(topic, message, options, (err, packet) => {
        if (err != null) reject(err);
        resolve(packet);
      });
    });
  };

  end = () => {
    this._logger?.info('end called');
    this._mqttClient?.end();
  };

  handleConnect = () => {
    this._logger?.info(`Connected to MQTT Broker ${this._host}. Register Topics.`);
    this._isDissconnected = false;
    this._onConnect();
  };

  handleMessage = (topic: string, payload: Buffer, packet: IPublishPacket) => {
    if (payload[payload.length - 1] === 13) {
      return this._onMessage(topic, payload.slice(0, payload.length - 1), packet);
    }

    this._onMessage(topic, payload, packet);
  };

  handleEnd = () => {
    this._logger?.info('Connection to mqtt broker endet.');
    this._onEnd();
  };

  handleDisconnect = () => {
    if (!this._isDissconnected) {
      this._logger?.info('Dissconnect from mqtt broker.');
      this._onDisconnect();
    }
    this._isDissconnected = true;
  };

  get connected(): boolean {
    return this._mqttClient?.connected ?? false;
  }

  get connectionTimeout(): number {
    if (this._mqttClient == null) return -1;
    const timeout = this._mqttClient.options.connectTimeout;
    return timeout != null ? timeout : DEFAULT_CONNECT_TIMEOUT;
  }
}

export default MQTTClient;
