import RSocketWebSocketClient from 'rsocket-websocket-client';
import { RSocketClient } from 'rsocket-core';
import { SpanStatus } from '@sentry/tracing';

const { Flowable } = require('rsocket-flowable');

/**
 * Default to use for 'requestStream' when the backend is
 * written in Java.
 */
const JAVA_MAX_SAFE_INTEGER = 2147483647;

const VueRSocket = {
    install(Vue, rsConfig) {
        if (this.installed) {
            return;
        }
        this.Sentry = Vue.$sentry;
        this.installed = true;
        this.rsConfig = rsConfig;
        this.rsSocket = new Promise((resolve, reject) => {
            this.rsSocketResolve = resolve;
            this.rsSocketReject = reject;
        }).catch((err) => {
            console.error(err);
        });
        this.rsConnection = Vue.observable(undefined);
        this.rsClient = Vue.observable(undefined);
        this.statusTransaction = null;
        this.connectionStatusSpan = null;

        Vue.$rs = Vue.prototype.$rs = this;

        this.debugLog("Installed 'vue-rsocket' plugin");
    },
    subscribeToStatusChange() {
        this.connectionStatusTransaction = this.Sentry.startTransaction({ name: 'RSocketConnectionStatusChange' });
        this.Sentry.getCurrentHub().configureScope((scope) => scope.setSpan(this.connectionStatusTransaction));

        const createStatusSpan = () => {
            this.connectionStatusSpan = this.connectionStatusTransaction.startChild({
                op: 'statusChanged',
                description: 'connection status changed',
            });
        };

        createStatusSpan();
        this.transport.connectionStatus().subscribe({
            onNext: (status) => {
                this.connectionStatusSpan.setStatus(SpanStatus.Ok);
                this.connectionStatusSpan.setData('status', status);
                this.connectionStatusSpan.finish();
                createStatusSpan();

                this.debugLog('Transport status changed: ' + status.kind);
            },
            onSubscribe: (subscription) => {
                subscription.request(JAVA_MAX_SAFE_INTEGER);
            },
        });
    },
    isDebug() {
        return this.rsConfig.debug;
    },
    debugLog(...message) {
        if (this.isDebug()) {
            console.debug(...message);
        }
    },
    noClientPresent() {
        return !this.rsClient;
    },
    noConnectionCreated() {
        return !this.rsConnection;
    },
    connect(url) {
        if (url && typeof url === 'string') {
            // for reconnection need to use: const resumableTransport = new RSocketResumableTransport(...)
            const { debug, keepAlive, lifetime, dataMimeType, metadataMimeType } = this.rsConfig;
            this.transport = new RSocketWebSocketClient({
                url,
                debug,
                wsCreator: (url) => {
                    return new WebSocket(url);
                },
            });

            this.rsocketClientOptions = {
                setup: {
                    keepAlive,
                    lifetime,
                    dataMimeType,
                    metadataMimeType,
                },
                transport: this.transport,
            };
        } else {
            this.debugLog("Socket URL is missing in 'vue-rsocket' plugin");
            return;
        }

        try {
            if (this.noConnectionCreated()) {
                this.subscribeToStatusChange();
                this.rsClient = new RSocketClient(this.rsocketClientOptions);
                this.rsConnection = this.rsClient.connect();
                this.rsConnection.subscribe({
                    onComplete: (socket) => {
                        this.debugLog('rsConnection onComplete', { socket });
                        this.socket = socket;

                        this.rsSocketResolve({ socket });
                    },
                    onError: (e) => {
                        console.error(e);
                        this.rsSocketReject(e);
                    },
                    onSubscribe: (cancel) => {
                        /* call cancel() to abort */
                        this.debugLog('rsConnection onSubscribe');
                    },
                });
            } else {
                console.warn('Already connected');
            }
        } catch (e) {
            console.error({ e });
        }
    },
    createTransactionAndSpan({ transactionName, operationName, operationDescription, operationData }) {
        // This will create a new Transaction for you
        const transaction = this.Sentry.startTransaction({ name: transactionName });
        // Set transaction on scope to associate with errors and get included span instrumentation
        // If there's currently an unfinished transaction, it may be dropped
        this.Sentry.getCurrentHub().configureScope((scope) => scope.setSpan(transaction));

        const span = transaction.startChild({
            /**
             * Data of the Span.
             */
            data: operationData,
            /**
             * Operation of the Span.
             */
            op: operationName,
            description: operationDescription,
        });

        return { transaction, span };
    },
    requestChannel(socket, route, onMessage, requestStreamInformation) {
        const { transaction, span } = this.createTransactionAndSpan({
            transactionName: 'RSocketRequestChannel',
            operationName: 'channelSubscription',
            operationDescription: 'channel subscription',
            operationData: { route, requestStreamInformation },
        });

        let currentTransaction = transaction;
        let currentSpan = span;

        this.debugLog(`requestChannel on route: "${route}, ${JSON.stringify(requestStreamInformation)}"`);

        const stream = Flowable.just({
            data: JSON.stringify(requestStreamInformation.data),
            metadata: String.fromCharCode(requestStreamInformation.metadata.length) + requestStreamInformation.metadata,
        });

        return new Promise((resolve, reject) => {
            let channelSubscription;
            if (this.noConnectionCreated()) {
                const err = new Error("Could not 'requestChannel'. No RSocket connection found");
                currentSpan.setStatus(SpanStatus.UnknownError);
                currentSpan.setData('error', err);
                currentSpan.finish();
                currentTransaction.finish();

                reject(err);
            }

            if (!socket) {
                const err = new Error(
                    `Could not 'requestChannel' without socket. Params: ${JSON.stringify({
                        socket,
                        route,
                        onMessage,
                        requestStreamInformation,
                    })}`
                );
                currentSpan.setStatus(SpanStatus.UnknownError);
                currentSpan.setData('error', err);
                currentSpan.finish();
                currentTransaction.finish();

                reject(err);
            }

            socket.requestChannel(stream).subscribe({
                onComplete: () => {
                    currentSpan.setStatus(SpanStatus.Ok);
                    currentSpan.setData('info', { completed: true });
                    currentSpan.finish();
                    currentTransaction.finish();

                    this.debugLog(`'requestChannel' for : "${route}, ${JSON.stringify(requestStreamInformation)}" completed`);
                    resolve(channelSubscription);
                },
                onError: (error) => {
                    currentSpan.setStatus(SpanStatus.UnknownError);
                    currentSpan.setData('error', error);
                    currentSpan.finish();
                    currentTransaction.finish();

                    console.error(`'requestChannel' for : "${route}, ${JSON.stringify(requestStreamInformation)}" error: ${error.message}`);
                    reject(error);
                },
                onNext: (messageData) => {
                    this.debugLog('requestChannel onNext', { messageData });
                    if (onMessage) {
                        onMessage(messageData, channelSubscription);
                    }
                },
                onSubscribe: (subscription) => {
                    currentSpan.setStatus(SpanStatus.Ok);
                    currentSpan.finish();
                    currentTransaction.finish();

                    // the previous transaction is completed, therefore we create a new one
                    // for cases of transition to onError or onComplete callbacks
                    const { transaction, span } = this.createTransactionAndSpan({
                        transactionName: 'RSocketRequestChannel',
                        operationName: 'channelSubscription',
                        operationDescription: 'channel subscription',
                        operationData: { route, requestStreamInformation },
                    });
                    currentTransaction = transaction;
                    currentSpan = span;

                    this.debugLog('requestChannel onSubscribe', { subscription });
                    channelSubscription = subscription.cancel;
                    subscription.request(JAVA_MAX_SAFE_INTEGER);
                    resolve(subscription.cancel);
                },
            });
        }).catch((err) => {
            console.error(err);
        });
    },
    requestStream(socket, route, onMessage, requestStreamInformation) {
        // doesn't work with current backend settings:
        // RSocket error 0x201 (APPLICATION_ERROR): Destination 'data' does not support REQUEST_STREAM.
        // Supported interaction(s): [REQUEST_CHANNEL]
        if (this.noConnectionCreated()) {
            throw new Error("Could not 'requestStream'. No RSocket connection found");
        }
        this.debugLog(`requestStream on route: "${route}"`);
        return new Promise((resolve, reject) => {
            let channelSubscription;
            if (!socket) {
                reject(
                    new Error(
                        `Could not 'requestStream' without socket. Params: ${JSON.stringify({
                            socket,
                            route,
                            onMessage,
                            requestStreamInformation,
                        })}`
                    )
                );
            }

            socket
                .requestStream({
                    data: JSON.stringify(requestStreamInformation.data),
                    metadata: String.fromCharCode(requestStreamInformation.metadata.length) + requestStreamInformation.metadata,
                })
                .subscribe({
                    onComplete: () => {
                        this.debugLog(`'requestStream' for : "${route}" completed`);
                        resolve(channelSubscription);
                    },
                    onError: (error) => {
                        console.error(`'requestStream' for : "${route}" error: ${error.message}`);
                        reject(error);
                    },
                    onNext: (messageData) => {
                        this.debugLog('requestStream onNext', { messageData });
                        if (onMessage) {
                            onMessage(messageData, channelSubscription);
                        }
                    },
                    onSubscribe: (subscription) => {
                        this.debugLog('requestStream onSubscribe', { subscription });
                        channelSubscription = subscription.cancel;
                        subscription.request(JAVA_MAX_SAFE_INTEGER);
                        resolve(subscription.cancel);
                    },
                });
        }).catch((err) => {
            console.error(err);
        });
    },
    connectionStatus() {
        return this.rsocketClientOptions.transport.connectionStatus();
    },
    disconnect() {
        // socket disconnect
        if (this.connectionStatusTransaction) {
            this.connectionStatusTransaction.finish();
        }
        if (this.rsConnection && this.rsClient) {
            this.rsClient.close();
            this.rsClient = undefined;
            this.rsConnection = undefined;
        }
    },
};
export default VueRSocket;
