import {
    AbstractLogger,
    SocketAuthenticationUpstreamMessage,
    SocketChannelRequestDownstreamMessage,
    SocketChannelRequestUpstreamMessage,
    SocketChannels,
    SocketPlainMessage,
} from "@clairejs/core";
import { Observable, Subscriber, Subscription } from "rxjs";

import { AbstractTokenManager } from "../api/AbstractTokenManager";
import { AbstractHttpClient } from "../api/AbstractHttpClient";
import { AbstractClientSocketManager } from "./AbstractClientSocketManager";
import { SocketConfig } from "./SocketConfig";
import { ClientSocket } from "./ClientSocket";
import { IClientSocket } from "./IClientSocket";
import { IWebSocket } from "./IWebSocket";

export class DefaultClientSocketManager extends AbstractClientSocketManager {
    private socket?: IWebSocket;

    private allSockets: {
        socket: ClientSocket;
        connected?: boolean;
        channels: string[];
        subscriptions: Subscription[];

        connectionSubscriber?: Subscriber<boolean>;
        channelsSubscriber?: Subscriber<string[]>;
        messageReceivingSubscriber?: Subscriber<SocketPlainMessage>;
        messageSendingSubscriber?: Subscriber<SocketPlainMessage>;

        messageSendingObservable?: Observable<SocketPlainMessage>;
        channelsObservable?: Observable<string[]>;
    }[] = [];

    private allChannels: { channel: string; requested: boolean; connected: boolean }[] = [];

    private socketConnected?: boolean;
    private pingIntervalId?: any;
    private retryTimeoutId?: any;
    private accumulatedPing = 0;
    private retryDelay = 0;
    private intendedDisconnection?: boolean;

    constructor(
        private readonly wsProvider: () => IWebSocket,
        private readonly tokenManager: AbstractTokenManager,
        private readonly httpClient: AbstractHttpClient,
        private readonly logger: AbstractLogger,
        private readonly config?: SocketConfig
    ) {
        super();
    }

    create(channels: string[]): IClientSocket {
        const connectionObservable = new Observable<boolean>((subscriber) => {
            const socketInfo = this.getSocketInfo(socket);

            if (!socketInfo) {
                throw "Cannot resolve socket info in connectionObservable";
            }

            socketInfo.connectionSubscriber = subscriber;
        });

        const messageReceiveingObservable = new Observable<any>((subscriber) => {
            const socketInfo = this.getSocketInfo(socket);
            if (!socketInfo) {
                throw "Cannot resolve socket info in messageReceiveingObservable";
            }
            socketInfo.messageReceivingSubscriber = subscriber;
        });

        const messageSendingObservable = new Observable<any>((subscriber) => {
            const socketInfo = this.getSocketInfo(socket);
            if (!socketInfo) {
                throw "Cannot resolve socket info in messageSendingObservable";
            }
            socketInfo.messageSendingSubscriber = subscriber;
        });

        const channelsObservable = new Observable<string[]>((subscriber) => {
            const socketInfo = this.getSocketInfo(socket);
            if (!socketInfo) {
                throw "Cannot resolve socket info in channelsObservable";
            }
            socketInfo.channelsSubscriber = subscriber;
        });

        const socket = new ClientSocket(
            messageReceiveingObservable,
            connectionObservable,
            (_socket) => {
                const socketInfo = this.getSocketInfo(socket);
                if (!socketInfo || !socketInfo.messageSendingSubscriber) {
                    throw "Cannot resolve socket info in messageSendingSubscriber resolver";
                }
                return socketInfo.messageSendingSubscriber;
            },
            (_socket) => {
                //-- socket request connection
                const socketInfo = this.getSocketInfo(_socket);
                if (!socketInfo) {
                    throw "Cannot resolve socket info in connection request";
                }

                if (socketInfo.connected) {
                    return;
                }

                if (!socketInfo.channelsObservable) {
                    throw "Channels observer not available";
                }

                if (!socketInfo.messageSendingObservable) {
                    throw "Message sending observable not available";
                }

                const sub = socketInfo.channelsObservable.subscribe({
                    next: (availableChannels) => {
                        if (!socketInfo.connected && socketInfo.channels.every((c) => availableChannels.includes(c))) {
                            //-- socket connected
                            socketInfo.connected = true;
                            if (!socketInfo.connectionSubscriber) {
                                throw "Connection subscriber not available";
                            }
                            socketInfo.connectionSubscriber.next(socketInfo.connected);
                        } else if (socketInfo.connected && socketInfo.channels.some((c) => !availableChannels.includes(c))) {
                            //-- socket disconnected
                            socketInfo.connected = false;
                            if (!socketInfo.connectionSubscriber) {
                                throw "Disconnection subscriber not available";
                            }
                            socketInfo.connectionSubscriber.next(socketInfo.connected);
                        }
                    },
                });
                socketInfo.subscriptions.push(sub);

                socketInfo.messageSendingObservable.subscribe({
                    next: (message) => {
                        if (!socketInfo.connected) {
                            throw "Socket not connected";
                        }
                        this.sendToChannel(SocketChannels.MESSAGE_CHANNEL, message);
                    },
                });

                //-- register channel
                for (const channel of socketInfo.channels) {
                    let channelInfo = this.allChannels.find((channelInfo) => channelInfo.channel === channel);
                    if (!channelInfo) {
                        channelInfo = { channel, requested: false, connected: false };
                        this.allChannels.push(channelInfo);
                    }
                    channelInfo.requested = true;
                }

                //-- check and request for missing channels
                this.checkChannelRequests();
            },
            (_socket) => {
                //-- socket request disconnection
                const socketInfo = this.getSocketInfo(socket);
                if (!socketInfo) {
                    throw "Cannot resolve socket info in disconnection request";
                }

                //-- remove socket
                socketInfo.subscriptions.forEach((sub) => sub.unsubscribe());

                const index = this.allSockets.indexOf(socketInfo);
                if (index >= 0) {
                    this.allSockets.splice(index, 1);
                }
            }
        );

        this.allSockets.push({ socket, channels, messageSendingObservable, channelsObservable, subscriptions: [] });

        return socket;
    }

    forceDisconnect() {
        this.intendedDisconnection = true;
        if (this.socket) {
            this.socket.close();
            this.socket = undefined;
        }

        if (this.retryTimeoutId) {
            clearTimeout(this.retryTimeoutId);
        }
    }

    forceReconnect(): void {
        this.intendedDisconnection = false;
        if (this.socketConnected) {
            this.socket?.close();
        } else {
            this.retryDelay = 0;
            if (this.socket === undefined) {
                if (this.retryTimeoutId) {
                    clearTimeout(this.retryTimeoutId);
                    this.retryTimeoutId = undefined;
                }
                this.connect();
            }
        }
    }

    private connect() {
        this.socket = this.wsProvider();

        this.socket.onopen(() => {
            if (this.pingIntervalId) {
                clearInterval(this.pingIntervalId);
            }

            this.tokenManager
                .getAccessToken()
                .then((accessToken) => {
                    this.sendToChannel(SocketChannels.AUTHENTICATION_CHANNEL, {
                        authorizationData: accessToken?.token,
                    } as SocketAuthenticationUpstreamMessage);
                })
                .catch((err) => {
                    this.logger.error(err);
                    this.forceReconnect();
                });
        });

        this.socket.onmessage((data) => {
            if (!data) {
                return;
            }

            const { channel, message } = JSON.parse(data) as { channel: string; message: any };
            this.logger.debug("Receive", channel, message);

            switch (channel) {
                case SocketChannels.PING_PONG_CHANNEL:
                    {
                        this.accumulatedPing = 0;
                    }
                    return;

                case SocketChannels.AUTHENTICATION_CHANNEL:
                    {
                        this.logger.debug("Socket connection established", message);
                        //-- socket authenticated
                        this.socketConnected = true;

                        //-- socket open, set interval
                        if (this.pingIntervalId) {
                            clearInterval(this.pingIntervalId);
                        }

                        this.accumulatedPing = 0;
                        this.pingIntervalId = setInterval(() => {
                            this.accumulatedPing += 1;
                            if (this.accumulatedPing > (this.config?.keepAlive?.deadThreashold || 3)) {
                                //-- socket connection lost, not intended
                                this.intendedDisconnection = false;
                                this.socket?.close();
                            } else {
                                this.sendToChannel(SocketChannels.PING_PONG_CHANNEL);
                            }
                        }, this.config?.keepAlive?.pingIntervalMs || 10000);

                        this.intendedDisconnection = false;
                        if (this.retryTimeoutId) {
                            clearTimeout(this.retryTimeoutId);
                            this.retryTimeoutId = undefined;
                        }

                        //-- check and send pending channel join requests
                        this.checkChannelRequests();
                    }
                    return;

                case SocketChannels.CHANNEL_JOIN_REQUEST:
                    {
                        const receivedMessage = message as SocketChannelRequestDownstreamMessage;
                        if (!receivedMessage.success) {
                            this.logger.error(receivedMessage.error);
                            return;
                        }

                        receivedMessage.channels.forEach((channel) => {
                            const channelInfo = this.allChannels.find((info) => info.channel === channel);
                            if (channelInfo) {
                                channelInfo.connected = true;
                                channelInfo.requested = false;
                            }
                        });

                        this.notifyCurrentChannels();
                    }
                    return;

                case SocketChannels.CHANNEL_LEAVE_REQUEST:
                    {
                        const receivedMessage = message as SocketChannelRequestDownstreamMessage;
                        if (!receivedMessage.success) {
                            this.logger.error(receivedMessage.error);
                            return;
                        }

                        receivedMessage.channels.forEach((channel) => {
                            const channelInfo = this.allChannels.find((info) => info.channel === channel);
                            if (channelInfo) {
                                channelInfo.connected = false;
                            }
                        });

                        this.notifyCurrentChannels();
                    }
                    return;

                case SocketChannels.MESSAGE_CHANNEL:
                    {
                        const receivedMessage = message as SocketPlainMessage;

                        this.allSockets.forEach((socketInfo) => {
                            if (!socketInfo.channels.includes(receivedMessage.channel)) {
                                return;
                            }
                            socketInfo.messageReceivingSubscriber?.next(receivedMessage);
                        });
                    }
                    return;

                case SocketChannels.DISCONNECTION_CHANNEL:
                    {
                        if (!message) {
                            //-- retry once
                            this.httpClient
                                .refreshToken(message)
                                .then(() => {
                                    if (!!this.socket) {
                                        this.intendedDisconnection = false;
                                        this.socket.close();
                                    } else {
                                        this.forceReconnect();
                                    }
                                })
                                .catch((err) => {
                                    this.logger.debug(err);
                                    this.intendedDisconnection = true;
                                    this.socket?.close();
                                });
                        } else {
                            this.intendedDisconnection = true;
                            this.socket?.close();
                        }
                    }
                    return;
                default:
                    return;
            }
        });

        this.socket.onclose(() => {
            if (this.pingIntervalId) {
                clearInterval(this.pingIntervalId);
            }

            this.socket = undefined;
            this.socketConnected = false;

            //-- clear all channels and notify for disconnection
            for (const channel of this.allChannels) {
                channel.connected = false;
                channel.requested = true;
            }
            this.notifyCurrentChannels();

            this.logger.debug("Socket connnection closed, intended ", this.intendedDisconnection);

            //-- skip if retryDelay has been set
            if (this.intendedDisconnection) {
                //-- do nothing more
                this.logger.debug("Socket connection terminated");
            } else {
                if (!this.retryTimeoutId) {
                    this.retryDelay = 0;
                    this.retry();
                }
            }
        });
    }

    private retry() {
        this.logger.debug(`Socket connection retrying in ${this.retryDelay}ms`);
        this.connect();
        this.retryDelay += this.config?.reconnectTimeDeltaMs || 3000;
        this.retryTimeoutId = setTimeout(() => {
            if (!this.socketConnected && !this.intendedDisconnection) {
                this.retry();
            }
        }, this.retryDelay);
    }

    private checkChannelRequests() {
        if (!this.socketConnected) {
            return;
        }

        const pendingChannels = this.allChannels.filter((c) => c.requested).map((c) => c.channel);
        pendingChannels.length && this.sendToChannel(SocketChannels.CHANNEL_JOIN_REQUEST, { channels: pendingChannels } as SocketChannelRequestUpstreamMessage);
    }

    private notifyCurrentChannels() {
        const boundChannels = this.allChannels.filter((c) => c.connected).map((c) => c.channel);
        this.allSockets.forEach((socketInfo) => socketInfo.channelsSubscriber?.next(boundChannels));
    }

    private getSocketInfo(socket: IClientSocket) {
        return this.allSockets.find((info) => info.socket === socket);
    }

    private sendToChannel(channel: string, message?: any) {
        this.logger.debug("Send", channel, message);
        if (!this.socket) {
            throw "Socket not available";
        }
        let data = JSON.stringify({ channel, message });
        this.socket.send(data);
    }
}
