import { isNil } from 'lodash-es';
import type { Driver, RxSession, Session } from 'neo4j-driver';
import neo4j from 'neo4j-driver';
import { Subject, of } from 'rxjs';
import { bufferCount, catchError, finalize, map, takeUntil } from 'rxjs/operators';
import { v4 } from 'uuid';

import { getErrorMessage } from '../../modules/errorHandler';
import { MAX_QUERY_RESULT_LIMIT } from '../../state/settings/settings';
import type { Nullable } from '../../types/utility';
import { BoltError, isBoltCompatibleError, isForbiddenError, isNoProcedureError } from '../errors/errorUtils';
import { log } from '../logging';
import { getClientVersion } from '../versions/version';
import type { BaseQueryParams, ReadRxTransactionParams, WriteRxTransactionParams } from './bolt.types';
import * as mappings from './boltMappings';
import { TRANSACTIONS_CONFIGURATION } from './config';

const SYSTEM_DATABASE = 'system';
export const SYSTEM_QUERY = 'system';
export const USER_QUERY = 'user-action';
const DIRECT_QUERY = 'user-direct';
const TRANSPILED_QUERY = 'user-transpiled';
export const ABORTED_ERROR_CODE = 'ABORTED_ERROR_CODE';
export const MANAGED_ROLLBACK_ERROR = 'MANAGED_ROLLBACK_ERROR';

const ERROR_REQUEST_NOT_FOUND = 'Request Id not found';

const QueryTypes = [SYSTEM_QUERY, USER_QUERY, DIRECT_QUERY, TRANSPILED_QUERY];

const configEnabled = true;
const connectionDead = false;
let sessions: { session: Session | RxSession; id: string; onCancel: () => any }[] = [];
let isMultiDatabase: boolean;
let currentDatabase: string | undefined;
let driver: Nullable<Driver>;
let containerApp = 'standalone';
const asyncSessionToAbortHandler: Record<string, () => void> = {};

const isValidName = (name: unknown): name is string => typeof name === 'string' && name.length > 0;
const getDatabaseName = (database: string | undefined) => {
  if (isValidName(database)) return database;
  if (isValidName(currentDatabase)) return currentDatabase;
  return null;
};

const getConfig = (queryType: string) =>
  configEnabled
    ? {
        metadata: {
          app: 'bloom',
          container_app: containerApp,
          type: queryType,
          version: getClientVersion(),
          vendor: 'neo4j',
        },
      }
    : null;

const setCurrentDatabase = (database: string | undefined) => {
  currentDatabase = database;
};

const setContainerApp = (container = 'standalone') => {
  containerApp = container;
};

const isAlive = () => !connectionDead;

const setDriver = async (aDriver: Driver) => {
  driver = aDriver;

  try {
    isMultiDatabase = await driver.supportsMultiDb();

    const session = getSession(SYSTEM_DATABASE);
    const query = isMultiDatabase ? 'CALL db.ping()' : 'CALL db.indexes()';

    if (session != null) {
      await session
        .run(query)
        .catch((error) => {
          if (!(isForbiddenError(error) || isNoProcedureError(error))) {
            throw error;
          }
        })
        .finally(() => {
          void session.close();
        });
    }
  } catch (err) {
    throw new BoltError(getErrorMessage(err));
  }
};

const clearDriver = () => {
  driver = null;
};

const getSession = (database: Nullable<string>): Session | undefined => {
  return isMultiDatabase && typeof database === 'string' ? driver?.session({ database }) : driver?.session();
};

const getRxSession = (database: Nullable<string>) => {
  return isMultiDatabase && typeof database === 'string' ? driver?.rxSession({ database }) : undefined;
};

const addToSessions = (
  session: Session | RxSession,
  requestId: string,
  onCancel: () => any = async () => clearSession(session, requestId),
) => {
  sessions.push({ session, id: requestId, onCancel });
};

const clearSession = async (session: Session | RxSession, requestId: string) => {
  sessions = sessions.filter((s) => s.id !== requestId);
  await closeSession(session);
};

const closeSession = async (session: Session | RxSession) => {
  const closingSession = session.close();

  if ('subscribe' in closingSession) {
    closingSession.subscribe({
      complete() {
        log.debug('Rx session successfully closed');
      },
    });
  } else {
    log.debug('Async session successfully closed');
    await closingSession;
  }
};

const isQueryAbortedNoReactive = ({ message }: Error) => {
  return (
    message === 'You cannot run more transactions on a closed session.' ||
    message.includes('Explicitly terminated by the user')
  );
};
const isQueryAbortedReactive = ({ message }: Error) => message?.includes('Explicitly terminated by the user');

const runQuery = async (
  input: string,
  { parameters = {}, type = SYSTEM_QUERY, database, requestId = v4() }: BaseQueryParams = {},
) => {
  const session = getSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  const extendedConfig =
    config !== null
      ? {
          ...TRANSACTIONS_CONFIGURATION.RUN_QUERY,
          ...config,
        }
      : undefined;

  addToSessions(session, requestId);

  try {
    const result = await session.executeRead((tx) => tx.run(input, parameters), extendedConfig);
    return result;
  } catch (error) {
    if (!isBoltCompatibleError(error)) throw error;
    if (isQueryAbortedNoReactive(error)) {
      error.ignore = true;
    }
    throw error;
  } finally {
    await clearSession(session, requestId);
  }
};

interface ReadTransactionParams extends BaseQueryParams {
  timeout?: Nullable<number>;
  forceAbortHandler?: () => void;
}

const readTransaction = async (
  input: string,
  {
    parameters = {},
    type = SYSTEM_QUERY,
    database,
    requestId = v4(),
    timeout = null,
    forceAbortHandler,
  }: ReadTransactionParams = {},
) => {
  const session = getSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  const extendedConfig =
    config !== null
      ? {
          ...(timeout != null ? { timeout } : {}),
          ...config,
        }
      : undefined;

  addToSessions(session, requestId);

  if (forceAbortHandler != null) {
    asyncSessionToAbortHandler[requestId] = forceAbortHandler;
  }

  try {
    const result = await session.executeRead((tx) => tx.run(input, parameters), extendedConfig);

    log.debug('readTransaction query', requestId, input, parameters);
    log.trace('readTransaction result', requestId, result);

    if (type === USER_QUERY) {
      const mappedResult = {
        records: result.records.map((record) => mappings.recordMapper(record)),
        summary: mappings.applyGraphTypes(result.summary),
      };
      return mappedResult;
    }
    return result;
  } catch (error) {
    if (!isBoltCompatibleError(error)) throw error;
    if (isQueryAbortedNoReactive(error)) {
      error.ignore = true;
    }
    throw error;
  } finally {
    await clearSession(session, requestId);
  }
};

const readRxTransaction = async (
  input: string,
  {
    parameters = {},
    type = SYSTEM_QUERY,
    database,
    requestId = v4(),
    timeout,
    limit = MAX_QUERY_RESULT_LIMIT,
    errorHandler,
  }: ReadRxTransactionParams = {},
) => {
  const session = getRxSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  const extendedConfig =
    config !== null
      ? {
          ...(timeout != null ? { timeout } : {}),
          ...config,
        }
      : undefined;

  const destroy = new Subject();
  const complete = () => {
    destroy.next(null);
    destroy.complete();
  };

  addToSessions(session, requestId, complete);

  const result = session
    .readTransaction(
      (tx) =>
        tx
          .run(input, parameters)
          .records()
          .pipe(
            bufferCount(limit),
            map((data) => {
              log.debug('readRxTransaction query', requestId, input, parameters);
              log.trace('readRxTransaction result', requestId, data);
              return data.map(mappings.recordMapper);
            }),
            takeUntil(destroy),
          ),
      extendedConfig,
    )
    .pipe(
      catchError((error) => {
        if (isQueryAbortedReactive(error)) {
          error.ignore = true;
          error.code = ABORTED_ERROR_CODE;
        }
        errorHandler?.(error, complete);
        return of(error);
      }),
      finalize(() => {
        void clearSession(session, requestId);
      }),
    );

  return {
    result,
    complete,
  };
};

const writeRxTransaction = async (
  input: string,
  {
    parameters = {},
    type = SYSTEM_QUERY,
    database,
    requestId = v4(),
    timeout = null,
    limit = MAX_QUERY_RESULT_LIMIT,
    errorHandler,
  }: WriteRxTransactionParams = {},
) => {
  const session = getRxSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  const extendedConfig =
    config !== null
      ? {
          ...(timeout != null ? { timeout } : {}),
          ...config,
        }
      : undefined;

  const destroy = new Subject();
  const complete = () => {
    destroy.next(null);
    destroy.complete();
  };

  addToSessions(session, requestId, complete);

  const result = session
    .writeTransaction(
      (tx) =>
        tx
          .run(input, parameters)
          .records()
          .pipe(
            bufferCount(limit),
            map((data) => {
              log.debug('writeRxTransaction query', requestId, input, parameters);
              log.trace('writeRxTransaction result', requestId, data);
              return data.map(mappings.recordMapper);
            }),
            takeUntil(destroy),
          ),
      extendedConfig,
    )
    .pipe(
      catchError((error) => {
        if (isQueryAbortedReactive(error)) {
          error.ignore = true;
          error.code = ABORTED_ERROR_CODE;
        }
        errorHandler?.(error, complete);
        return of(error);
      }),
      finalize(() => {
        void clearSession(session, requestId);
      }),
    );

  return {
    result,
    complete,
  };
};

const readSystemTransaction = async (
  input: string,
  { parameters = {}, type = SYSTEM_QUERY, requestId = v4() }: BaseQueryParams = {},
) => {
  const session = getSession(SYSTEM_DATABASE);

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  addToSessions(session, requestId);

  try {
    const result = await session.executeRead((tx) => tx.run(input, parameters), config ?? undefined);
    log.debug('readSystemTransaction query', requestId, input, parameters);
    log.trace('readSystemTransaction result', requestId, result);
    return result;
  } catch (error) {
    if (!isBoltCompatibleError(error)) throw error;
    if (isQueryAbortedNoReactive(error)) {
      error.ignore = true;
    }
    throw error;
  } finally {
    await clearSession(session, requestId);
  }
};

const writeTransaction = async (
  input: string,
  { parameters = {}, type = SYSTEM_QUERY, database, requestId = v4() }: BaseQueryParams = {},
) => {
  const session = getSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  addToSessions(session, requestId);

  try {
    const result = await session.executeWrite((tx) => tx.run(input, parameters), config ?? undefined);
    log.debug('writeTransaction query', requestId, input, parameters);
    log.trace('writeTransaction result', requestId, result);
    return result;
  } finally {
    await clearSession(session, requestId);
  }
};

const writeAndRollbackTransaction = async (
  input: string,
  { parameters = {}, type = SYSTEM_QUERY, database, requestId = v4() }: BaseQueryParams = {},
) => {
  const session = getSession(getDatabaseName(database));

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  addToSessions(session, requestId);

  try {
    log.trace('writeAndRollbackTransaction query', requestId, input, parameters);

    return await session.executeWrite(async (tx) => {
      await tx.run(input, parameters);
      throw new Error(MANAGED_ROLLBACK_ERROR);
    }, config ?? undefined);
  } finally {
    await clearSession(session, requestId);
  }
};

const writeSystemTransaction = async (
  input: string,
  { parameters = {}, type = SYSTEM_QUERY, requestId = v4() }: BaseQueryParams = {},
) => {
  const session = getSession(SYSTEM_DATABASE);

  if (isNil(session)) {
    throw new Error('No session');
  }

  const config = getConfig(type);

  addToSessions(session, requestId);

  try {
    const result = await session.executeWrite((tx) => tx.run(input, parameters), config ?? undefined);
    log.debug('writeSystemTransaction query', requestId, input, parameters);
    log.trace('writeSystemTransaction result', requestId, result);
    return result;
  } finally {
    await clearSession(session, requestId);
  }
};

const cancelTransaction = async (requestId: string) => {
  const session = sessions.find((s) => s.id === requestId);
  const requestAbortHandler = asyncSessionToAbortHandler[requestId];
  if (requestAbortHandler !== undefined) {
    requestAbortHandler();
    // eslint-disable-next-line  @typescript-eslint/no-dynamic-delete
    delete asyncSessionToAbortHandler[requestId];
  } else if (session != null) {
    log.debug(`cancelling transaction with requestId: ${requestId}`);
    await session.onCancel();
  } else {
    throw new BoltError({ message: ERROR_REQUEST_NOT_FOUND, ignore: true });
  }
};

export default {
  runQuery,
  readTransaction,
  readRxTransaction,
  writeTransaction,
  writeRxTransaction,
  writeAndRollbackTransaction,
  readSystemTransaction,
  writeSystemTransaction,
  setDriver,
  setContainerApp,
  clearDriver,
  setCurrentDatabase,
  cancelTransaction,
  isAlive,
  neo4j,
  QueryTypes,
};
