/* eslint-disable @typescript-eslint/no-explicit-any */
import {
  KemuHubConnector,
  VersionCompareMode,
  ParentEventHandler,
  SetOutputsArgs,
  SetStateArgs,
  SubscribableHubCommand,
  GetDefaultStateHandler,
  SubscribeToServiceEventsHandler,
  UnsubscribeFromServiceHandler,
  CallProcessorFuncHandler,
  HubBroadcastEvent,
  WidgetState,
  WidgetType ,
  DeepReadOnly,
  DeepReadOnlyArray ,
  HubServiceState,
  InitializeServiceInstanceHandler,
  TerminateServiceInstanceHandler
} from '@kemu-io/kemu-core/types';
import EventEmitter from 'emittery';
import KemuCore from '@kemu-io/kemu-core';
import { createImageDataFromInfo, isImageDataType } from '@kemu-io/kemu-core/common/utils';
import { findRecipe } from '@kemu-io/kemu-core/common/recipeCache';
import {
  GetServiceContentsArgs,
  GetServiceContentsResponse,
  InitInstanceArgs,
  TerminateInstanceArgs,
  KemuHubFunctions,
  SerializableServiceInfo,
  BroadcastEvent,
  TargetOutput,
  SetDependencyPathArgs,
  ServiceToServiceFunctions,
  GetDependencyPathArgs,
  DataType,
  ImageDataLike,
  Data,
} from '@kemu-io/hs-types';
import KemuRemoteInvoke, { ExecuteConfig, HandleRemoteInvokeFn } from '@kemu-io/hs/ri';
import { satisfies } from 'compare-versions';
import kemuLink from './link';
import { buildAckResponse, onAckRequest, onServicesListChanged, rebuildKemuTypes } from './helpers';
import { HubConnectionStatus } from '@src/types/kemuHub/hub_t';
import createLogger from '@common/logger';
import { KemuHubServiceId } from '@common/constants';

type ConnectionEvent = 'connected' | 'acknowledged' | 'disconnected';
type ValidEvent = SubscribableHubCommand | ConnectionEvent;
type VoidCallback = (...args: any[]) => void;

const logger = createLogger('kemuHubConnector');
const ri = new KemuRemoteInvoke('kweb');
// @ts-expect-error ignore
ri.setLogger(logger.log);

const _ee = new EventEmitter();
/** keeps custom subscriptions to certain service events */
const serviceEmitter = new EventEmitter();

let connectorServiceId: number | null = null;
let lastServicesList: SerializableServiceInfo[] | undefined = undefined;
let currentRecipePoolId: string | null = null;

type CachedContentsRecord = {
  contents?: ArrayBuffer;
  contentsChecksum: string;
  lastRequestedAt: number;
}

/** 
 * keeps track of the latest service contents so that 
 * widgets can be painted even after services go offline.
 **/
const serviceContentsCache: Record<string, CachedContentsRecord> = {};

/**
 * Adds the given service contents to the cache.
 * @param serviceName the name of the service
 * @param version the version of the service
 * @param contents the binary contents to add
 * @returns the newly added record
 */
const createServiceContentsRecord = (serviceName: string, version: string, contents?: ArrayBuffer) => {
  const key = `${serviceName}_${version}`;
  const record = {
    contents,
    lastRequestedAt: Date.now(),
    contentsChecksum: '',
  };

  serviceContentsCache[key] = record;
  return record;
};

const getCachedServiceRecord = (serviceName: string, version: string): CachedContentsRecord | null => {
  const key = `${serviceName}_${version}`;
  const record = serviceContentsCache[key];
  return record || null;
};

const removeCachedServiceContentsRecord = (serviceName: string, version: string) => {
  const key = `${serviceName}_${version}`;
  delete serviceContentsCache[key];
};


const fetchServiceContents = async (serviceName: string, version: string): Promise<GetServiceContentsResponse<ArrayBuffer>[0] | null> => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return null;
  }

  const request: GetServiceContentsArgs = [{ serviceName, version }];
  const response = await ri.execute<GetServiceContentsResponse<ArrayBuffer>>(
    KemuHubFunctions.GetServiceContents,
    request,
    sendBuffer,
    connectorServiceId,
    KemuHubServiceId
  );

  return response[0];
};

/** Keeps track of calls to fetch contents */
const fetchContentsPromisesCache = new Map<string, Promise<ArrayBuffer | null>>();

/**
 * Checks the cache for the given service info and returns the last stored contents.
 * If no request has been made for the service, it fetches the contents from the hub
 * and stores it in the cache before returning it. Calling this function multiple times
 * for the same service will only trigger one request to the hub.
 * @param serviceName the name of the service to fetch
 * @param version the version of the service to fetch
 * @param bypassCache if true, the cache will be ignored and the contents will be fetched from the hub
 * regardless of the last request time.
 */
const getServiceContents = async (serviceName?: string, version?: string, bypassCache=false): Promise<ArrayBuffer | null> => {
  if (!serviceName || !version) { return null; }

  // If this method is called multiple times before the first
  // request is resolved, only one request will be made to the hub.
  const cacheKey = `${serviceName}:${version}`;
  if (!bypassCache) {
    const cachedPromise = fetchContentsPromisesCache.get(cacheKey);
    if (cachedPromise) {
      return cachedPromise;
    }
  }

  const record = getCachedServiceRecord(serviceName, version);
  // Attempt to fetch the contents from the hub
  if (!record || bypassCache) {
    const newRecord = record || createServiceContentsRecord(serviceName, version);

    const fetchPromise = (async () => {
      try {
        const response = await fetchServiceContents(serviceName, version);
        if (response) {
          newRecord.contents = response.uiContent;
          newRecord.contentsChecksum = response.uiContentsChecksum || '';
        }
        return newRecord.contents || null;
      } catch (e) {
        logger.error(`Failed to fetch service contents for ${serviceName} v${version}`, e);
        return null;
      } finally {
        // Ensure to remove the promise from the cache once it's settled
        fetchContentsPromisesCache.delete(cacheKey);
      }
    })();

    // Cache the promise before returning it
    fetchContentsPromisesCache.set(cacheKey, fetchPromise);
    return fetchPromise;
  }

  return record?.contents || null;
};

/**
 * Use it as a widget identifier when custom app components use
 * the KL protocol to execute services functions.
 * This replaces the widgetId in the communication protocol.
 */
export const KemuAppIdIdentifier = 'app';

const sendBuffer = kemuLink.sendBuffer;

const onDisconnect = () => {
  logger.log('Disconnected from the server');
  connectorServiceId = null;
  ri.rejectAllPending('Disconnected from the server');
  _ee.emit('disconnected');
};

const onConnected = () => {
  logger.log('Connected to the server');
  _ee.emit('connected');
};

const setCurrentRecipePoolId = (id: string) => {
  currentRecipePoolId = id;
};

const getRecipePoolId = () => {
  return currentRecipePoolId;
};

const getEventKey = (
  eventType: 'broadcast' | 'setOutputs',
  serviceName: string,
  serviceVersion: string,
  widgetId: string
) => {
  const eventKey = `${eventType}_${serviceName}_${serviceVersion}_${widgetId}`;
  return eventKey;
};

/**
 * Finds all the HubService widgets in the current recipe
 * and issues subscriptions to their respective services if they are emitters.
 */
const reIssueWidgetsSubscriptions = async () => {
  const currentRecipePoolId = getRecipePoolId();
  if (currentRecipePoolId) {
    const things = findRecipe(currentRecipePoolId)?.blocks;
    if (things) {
      // const hubSubscriptionsToCreate: HubServiceSubscription [] = [];
      // Keeps track of which services have already been subscribed to
      const subscribedMap: Record<string, boolean> = {};

      for (const thingId in things) {
        const thing = things[thingId];
        for (const widgetId in thing.gates) {
          const widget = thing.gates[widgetId];
          if (widget.type === WidgetType.hubService) {
            const widgetState = widget.state as HubServiceState;
            if (widgetState.service?.eventEmitter) {
              const serviceKey = `${widgetState.service.name}_${widgetState.service.version}`;
              const alreadyIssued = subscribedMap[serviceKey];
              if (!alreadyIssued) {
                try {
                  logger.log(`Re-issuing subscription for service ${widgetState.service.name} (${widgetState.service.version})`);
                  // mark the service as subscribed
                  subscribedMap[serviceKey] = true;

                  await subscribeToServiceEvents({
                    listener: {
                      recipeId: currentRecipePoolId,
                    },
                    targetService: {
                      serviceName: widgetState.service.name,
                      version: widgetState.service.version,
                    }
                  });
                } catch (e) {
                  logger.error(`Failed to re-issue subscription for service ${widgetState.service.name} (${widgetState.service.version})`, e);
                }
              }
            }
          }
        }
      }
    }
  }
};

/**
 * Called when the hub emits a `services-changed` event.
 * It fetches the latest list of services and updates the cache
 * before notifying any subscribers.
 */
const handleServicesChangedList = async () => {
  logger.log('Services list changed');
  // Caches the latest list of services
  const updatedList = await getServices();
  // Notify any subscribers that the services list has changed
  _ee.emit('services-changed', updatedList);
};

const start = async () => {
  kemuLink.onConnected(onConnected);
  kemuLink.onDisconnected(onDisconnect);
  // Pass any incoming message to the remote invoker for decoding
  kemuLink.onMessageReceived(({ json, transmission }) => {
    ri.processMessage('websocket', sendBuffer, transmission, json);
  });

  ri.registerFunction(KemuHubFunctions.SetState, handleSetState);
  ri.registerFunction(KemuHubFunctions.SetOutputs, handleSetOutputs);
  ri.registerFunction(KemuHubFunctions.BroadcastEvent, handleBroadcastEvent);
  ri.registerFunction(KemuHubFunctions.HubBroadcastEvent, handleHubBroadcastEvent);

  ri.registerFunction(ServiceToServiceFunctions.SetDependencyPath, handleSetDependencyPath);
  ri.registerFunction(ServiceToServiceFunctions.GetDependencyPath, handleGetDependencyPath);

  kemuLink.onCommand((command) => {
    // Acknowledgement handling
    onAckRequest(command, (serviceId) => {
      if (serviceId) {
        // Reply with ACK
        const sent = kemuLink.sendCommand(buildAckResponse(serviceId));
        if (sent) {
          logger.log('Hub Link acknowledged');
          connectorServiceId = serviceId;
          _ee.emit('acknowledged');

          // Fetch the list of services available on the hub
          // Removed on 21/Jul/2024: Hooks are now responsible for fetching the services list.
          // getServices().catch(e => {
          //   logger.error('Failed to fetch initial services list', e);
          // });

          // Re-issue subscriptions
          reIssueWidgetsSubscriptions();
        }
      } else {
        logger.error('Hub sent ACK request without service id');
      }
    });

    // Services list changed
    onServicesListChanged(command, handleServicesChangedList);
  });

  await kemuLink.connect();
};

const stop = async () => {
  await kemuLink.disconnect();
};

const isConnected = () => {
  return kemuLink.isConnected();
};

const isReady = () => {
  return !!connectorServiceId && kemuLink.isConnected();
};

const getStatus = (): HubConnectionStatus => {
  if (connectorServiceId) { return 'acknowledged'; }
  if (kemuLink.isConnected()) { return 'connected'; }
  return 'disconnected';
};


/**
 * Handles a broadcast event from the hub.
 */
const handleHubBroadcastEvent: HandleRemoteInvokeFn = async (event) => {
  const broadcast = event.args[0] as HubBroadcastEvent;
  const broadcastEvent = getEventKey(
    'broadcast',
    `hub_${broadcast.type}`,
    KemuHubServiceId.toString(),
    ''
  );

  await serviceEmitter.emit(broadcastEvent, broadcast);
};

/**
 * Sends the event to any widget instance that is listening.
 */
const handleBroadcastEvent: HandleRemoteInvokeFn = async (event) => {
  const broadcast = event.args[0] as BroadcastEvent;

  // Fix output types
  for (const output of (broadcast.outputs || [])) {
    if (output.type === DataType.ImageData) {
      if (isImageDataType(output.value)) {
        output.value = createImageDataFromInfo(output.value as ImageDataLike);
      }
    }
  }

  const currentRecipeId = getRecipePoolId();
  if (currentRecipeId) {
    // Find in all the running things, all widgets that are subscribed to the given service
    const recipe = findRecipe(currentRecipeId);
    if (recipe) {

      /** emits an event using the given target */
      const notifyBroadcast = async (targetId: string) => {
        const broadcastEvent = getEventKey(
          'broadcast',
          broadcast.source.serviceName,
          broadcast.source.serviceVersion,
          targetId,
        );

        try {
          await serviceEmitter.emit(broadcastEvent, broadcast);
        } catch (e) {
          logger.error(`Failed to emit broadcast event for target ${targetId}`, e);
        }
      };

      // Emit an app broadcast event which would not be tied to a specific widget.
      // This covers cases when specific app components are listening for broadcasts.
      await notifyBroadcast(KemuAppIdIdentifier);

      for (const thingId in recipe.blocks) {
        const thing = recipe.blocks[thingId];
        for (const widgetId in thing.gates) {
          const widget = thing.gates[widgetId];
          if (widget.disabled) { continue; } // Skip disabled widgets

          const widgetState = widget.state as HubServiceState;
          if (
            widget.type === WidgetType.hubService
            && widgetState.service?.name === broadcast.source.serviceName
            && widgetState.service?.version === broadcast.source.serviceVersion
            && widgetState.variantId === broadcast.variantId
          ) {

            // Broadcast to listeners before triggering next gate
            await notifyBroadcast(widgetId);

            // Dispatch to outputs
            for (const output of (broadcast.outputs || [])) {
              if (output.value !== null && output.value !== undefined) {
                const data: Data = {
                  type: output.type,
                  value: output.value,
                  timestamp: Date.now(),
                };

                logger.log(`Sending data to output ${output.name} requested by messageId ${event.messageId}`);
                await KemuCore.triggerNextGlobalWidget(
                  currentRecipeId,
                  widgetId,
                  output.name,
                  data,
                  data,
                );
              }
            }
          }
        }
      }
    }
  }
};

const handleSetOutputs: HandleRemoteInvokeFn = async (event) => {
  const eventArgs = event.args[0] as SetOutputsArgs<Record<string, unknown>>;
  const { finalState, recipeId, widgetId } = eventArgs;

  // Read the current state from the recipe
  const currentState = KemuCore.getGlobalWidgetState<HubServiceState>(recipeId, widgetId);
  let hubState = currentState as HubServiceState | null;

  // If a new state was provided, update recipe
  if (finalState && hubState && currentState) {
    hubState = {
      ...currentState,
      customState: finalState,
    };

    KemuCore.setGlobalWidgetState(recipeId, widgetId, hubState, true);
  }

  // Dispatch setOutput events interceptions
  if (hubState?.service) {
    try {
      const eventKey = getEventKey(
        'setOutputs',
        hubState.service.name,
        hubState.service.version,
        widgetId
      );
      await serviceEmitter.emit(eventKey, eventArgs.outputs);
    } catch (e) {
      logger.error(`Failed to emit setOutputs event for widget ${widgetId}`, e);
    }
  }


  // Process the outputs with a non-null value
  for (const output of eventArgs.outputs) {
    if (output.value !== null && output.value !== undefined) {

      if (output.type === DataType.ImageData) {
        // Transform value back into an image data
        if (isImageDataType(output.value)) {
          output.value = createImageDataFromInfo(output.value as ImageDataLike);
        } else {
          logger.error(`Invalid ImageData value received for output ${output.name}. Expected an ImageData-like object`);
          continue;
        }
      } else if (output.type === DataType.JsonObj) {
        // Rebuild the object with the correct types
        rebuildKemuTypes(output.value);
      }

      const data: Data = {
        type: output.type,
        value: output.value,
        timestamp: Date.now(),
      };

      logger.log(`Sending data to output ${output.name} requested by messageId ${event.messageId}`);
      await KemuCore.triggerNextGlobalWidget(
        recipeId,
        widgetId,
        output.name,
        data,
        data,
      );
    }
  }

  event.reply({ success: [] });
};


const handleSetState: HandleRemoteInvokeFn = async (event) => {
  const eventArgs = event.args[0] as SetStateArgs<Record<string, unknown>>;
  const { newState, recipeId, widgetId } = eventArgs;
  const currentState = KemuCore.getGlobalWidgetState<HubServiceState>(recipeId, widgetId);
  if (currentState) {
    // Services have their own custom state, here we
    // update the portion of the state that is specific to the service
    const finalState: HubServiceState = {
      ...currentState,
      customState: {
        ...currentState.customState,
        ...newState,
      }
    };

    KemuCore.setGlobalWidgetState(recipeId, widgetId, finalState, true);
    return event.reply({ success: [] });
  }
};

/**
 * Invoked when a remote service is trying to register a dependency for a given service instance.
 */
const handleSetDependencyPath: HandleRemoteInvokeFn = async ({ args, reply }) => {
  const config = args[0] as SetDependencyPathArgs;
  const { recipeId, widgetId, path, key } = config;

  logger.log(`Received request to set a dependency path for "${key}" on widget "${widgetId}" in recipe "${recipeId}"`);
  const currentState = KemuCore.getGlobalWidgetState<HubServiceState>(recipeId, widgetId);
  if (currentState) {
    const newState: HubServiceState = { ...currentState };

    newState.dependencies = newState.dependencies || {};
    newState.dependencies[key] = { path };
    KemuCore.setGlobalWidgetState(recipeId, widgetId, newState, true);
  }

  return reply({ success: [] });
};

/**
 * Invoked when a remote service is trying to retrieve a dependency path for a given service instance.
 */
const handleGetDependencyPath: HandleRemoteInvokeFn = async ({ args, reply }) => {
  const config = args[0] as GetDependencyPathArgs;
  const { recipeId, widgetId, key } = config;

  logger.log(`Received request to get a dependency path for "${key}" on widget "${widgetId}" in recipe "${recipeId}"`);
  const currentState = KemuCore.getGlobalWidgetState<HubServiceState>(recipeId, widgetId);
  if (currentState) {
    const path = currentState.dependencies?.[key]?.path;
    logger.log(`Returning dependency path for "${key}" on widget "${widgetId}": "${path}"`);
    return reply({ success: [path || null] });
  } else {
    return reply({ error: 'Widget not found', errCode: 'WIDGET_NOT_FOUND' });
  }
};

/**
 * Fetches the current list of services from the hub.
 * @returns the latest list of services available on the hub.
 */
const getServices = async (): Promise<SerializableServiceInfo[]> => {
  logger.log(`Requesting services on ${new Date().toISOString()}`);

  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return [];
  }

  const response = await ri.execute<SerializableServiceInfo[]>(
    KemuHubFunctions.GetServices,
    [],
    sendBuffer,
    connectorServiceId,
    KemuHubServiceId
  );

  // Cache the list for faster look up
  lastServicesList = response;

  // Check if any service has change its contents and if so, clear out the cache
  for (const service of response) {
    // Ignore internal services
    if (service.internal) { continue; }
    const record = getCachedServiceRecord(service.name, service.version);
    if (!record || record?.contentsChecksum !== service.uiContentChecksum) {
      try {
        // Add record if missing
        const targetRecord = record || createServiceContentsRecord(service.name, service.version);
        logger.log(`Service ${service.name} v${service.version} contents have changed, fetching new contents`);
        const newContents = await fetchServiceContents(service.name, service.version);
        if (newContents) {
          targetRecord.contents = newContents.uiContent;
          targetRecord.contentsChecksum = service.uiContentChecksum || '';
        }
      } catch (e) {
        logger.error(`Failed to update service contents for ${service.name} v${service.version}`, e);
      }
    }
  }
  return response;
};



/**
 * @returns a copy of the last services list fetched from the hub.
 */
const getCachedServices = () => {
  return [...(lastServicesList || [])];
};

/**
 * @returns true if the services list has been cached at least once.
 */
const areServicesCached = () => {
  return !!lastServicesList;
};

const onParentEvent: ParentEventHandler = async (event) => {
  const { targetServiceSessionId } = event;

  if (!connectorServiceId) {
    logger.error('No target service session id provided');
    return;
  }

  logger.log(`Forwarding "onParentEvent" to service ${targetServiceSessionId}`);
  const result = await ri.execute(
    KemuHubFunctions.OnParentEvent,
    [event],
    sendBuffer,
    connectorServiceId,
    targetServiceSessionId,
    event.config,
  );

  return result;
};

/* const getInputs: GetInputNamesHandler = async (serviceSessionId, environment) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return [];
  }

  const getInputsArgs: GetInputsArgs = [environment];

  const inputs = await ri.execute<WidgetPort[]>(
    KemuHubFunctions.GetInputs,
    getInputsArgs,
    sendBuffer,
    connectorServiceId,
    serviceSessionId,
  );

  // Translate DataTypeStr into DataType
  // const fixedInputs = inputs.map((input) => {
  //   const types = Array.isArray(input.type) ? input.type : [input.type];
  //   const numericTypes = types.map((t) => DataType[t]);
  //   return {
  //     ...input,
  //     type: numericTypes,
  //   };
  // });

  return inputs;
}; */

/* const getOutputs: GetOutputNamesHandler = async (serviceSessionId: number, environment) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return [];
  }

  const getOutputsArgs: GetOutputsArgs = [environment];

  const outputs = await ri.execute<WidgetPort[]>(
    KemuHubFunctions.GetOutputs,
    getOutputsArgs,
    sendBuffer,
    connectorServiceId,
    serviceSessionId,
  );

  // Translate DataTypeStr into DataType
  // const fixedOutputs = outputs.map((output) => {
  //   const types = Array.isArray(output.type) ? output.type : [output.type];
  //   const numericTypes = types.map((t) => DataType[t]);
  //   return {
  //     ...output,
  //     type: numericTypes,
  //   };
  // });

  return outputs;
}; */

const getDefaultState: GetDefaultStateHandler = async <T extends WidgetState>(serviceSessionId: number): Promise<T> => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return {} as T;
  }

  const [serviceState] = await ri.execute<[T]>(
    KemuHubFunctions.GetDefaultState,
    [],
    sendBuffer,
    connectorServiceId,
    serviceSessionId,
  );

  return serviceState;
};

const callProcessorHandler: CallProcessorFuncHandler = async (
  serviceSessionId,
  environment,
  name,
  data,
  config
) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot get services.');
    return null;
  }

  const [response] = await ri.execute(
    KemuHubFunctions.UIEvent,
    [environment, name, data],
    sendBuffer,
    connectorServiceId,
    serviceSessionId,
    config,
  );

  return response;
};

const onCommand = (command: SubscribableHubCommand, cb: VoidCallback) => {
  const handler = (...args: any[]) => {
    cb(...args);
  };

  _ee.on(command, handler);

  return () => _ee.off(command, handler);
};

const subscribeToServiceEvents: SubscribeToServiceEventsHandler = async (subscription) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot subscribe to services.');
    return;
  }

  return ri.execute(
    KemuHubFunctions.SubscribeToService,
    [subscription],
    sendBuffer,
    connectorServiceId,
    KemuHubServiceId,
  );
};

const unsubscribeFromServiceEvents: UnsubscribeFromServiceHandler = async (subscription) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot subscribe to services.');
    return;
  }

  return ri.execute(
    KemuHubFunctions.UnsubscribeFromService,
    [subscription],
    sendBuffer,
    connectorServiceId,
    KemuHubServiceId,
  );
};

const onBroadcastEvent = <T extends BroadcastEvent | HubBroadcastEvent = BroadcastEvent>(
  widgetId: string,
  serviceName: string,
  serviceVersion: string,
  cb: (event: DeepReadOnly<T>) => void | Promise<void>
) => {
  const eventKey = getEventKey('broadcast', serviceName, serviceVersion, widgetId);
  const unsubscribe = serviceEmitter.on(eventKey, cb);
  return unsubscribe;
};

const onSetOutputsEvent = (
  widgetId: string,
  serviceName: string,
  serviceVersion: string,
  cb: (outputs: DeepReadOnlyArray<TargetOutput>) => void | Promise<void>
) => {
  const eventKey = getEventKey('setOutputs', serviceName, serviceVersion, widgetId);
  const unsubscribe = serviceEmitter.on(eventKey, cb);
  return unsubscribe;
};


// eslint-disable-next-line @typescript-eslint/no-explicit-any
const executeHubFunction = async <T>(name: string, args: any[], config?: ExecuteConfig): Promise<T | null> => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot execute function.');
    return null;
  }

  return ri.execute<T>(
    name,
    args,
    sendBuffer,
    connectorServiceId,
    KemuHubServiceId,
    config,
  );
};

const initializeServiceInstance: InitializeServiceInstanceHandler = async (config) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot execute function.');
    return;
  }

  const widgetState = KemuCore.getGlobalWidgetState<HubServiceState>(config.recipeId, config.widgetId);
  if (!widgetState) {
    throw new Error('Widget or recipe pool not found. Cannot initialize service instance.');
  }

  const depKeys = Object.keys(widgetState.dependencies || {});
  const dependenciesMap = depKeys.reduce((acc, key) => {
    acc[key] = widgetState.dependencies?.[key]?.path || null;
    return acc;
  }, {} as Record<string, string | null>);

  const initArgs: InitInstanceArgs = [{
    currentState: widgetState.customState,
    recipeId: config.recipeId,
    widgetId: config.widgetId,
    variantId: config.variantId,
    recipeType: config.recipeType,
    currentDependencies: dependenciesMap,
  }];

  const [updatedState] = await ri.execute<[WidgetState | null]>(
    KemuHubFunctions.InitializeInstance,
    initArgs,
    sendBuffer,
    connectorServiceId,
    config.sessionId,
  );

  if (updatedState) {
    // Read the current state from the recipe in case it was updated while the call was in progress
    const widgetState = KemuCore.getGlobalWidgetState<HubServiceState>(config.recipeId, config.widgetId);
    if (widgetState) {
      const finalState: HubServiceState = {
        ...widgetState,
        customState: updatedState,
      };

      KemuCore.setGlobalWidgetState(config.recipeId, config.widgetId, finalState, true);
    }

  }
};

const terminateServiceInstance: TerminateServiceInstanceHandler = async (
  serviceSessionId, widgetId, recipeId, variantId,
) => {
  if (!connectorServiceId) {
    logger.error('Hub Link has not been acknowledged. Cannot execute function.');
    return;
  }

  const widgetState = KemuCore.getGlobalWidgetState<HubServiceState>(recipeId, widgetId);
  if (!widgetState) {
    throw new Error('Widget or recipe pool not found. Cannot initialize service instance.');
  }

  const endArgs: TerminateInstanceArgs = [{
    currentState: widgetState.customState,
    recipeId,
    widgetId,
    variantId,
  }];

  await ri.execute<[HubServiceState | null]>(
    KemuHubFunctions.TerminateInstance,
    endArgs,
    sendBuffer,
    connectorServiceId,
    serviceSessionId,
  );
};

/**
   * Finds a service that matches the given name and compatible version.
   * @param name the name of the version to look for
   * @param targetVersion the minimum version to look for
   * @param compareMode the comparison mode to use when comparing versions. Defaults to '^`, which matches
   * patch and minor versions.
   * @param bypassCache if true, the current service list will be fetched from the hub instead of the cache.
   * If cache is not available, the services list will be fetched from the hub as well regardless of this flag.
   * @returns the service info if found, null otherwise.
   */
const getCompatibleService = async (name: string, targetVersion: string, compareMode: VersionCompareMode = '^', bypassCache?: boolean): Promise<SerializableServiceInfo | null> => {
  const validModes: VersionCompareMode[] = ['^', '~', '>=', '>', '<=', '<', '='];
  if (!validModes.includes(compareMode)) {
    throw new Error(`Invalid compare mode "${compareMode}". Must be one of ${validModes.join(', ')}`);
  }

  const useCache = areServicesCached();
  const services = (bypassCache || !useCache) ? await getServices() : getCachedServices();
  for (const service of services) {
    if (service.name === name) {
      // Exact match
      if (service.version === targetVersion) { return service; }
      // Semantic versioning match
      const isMatch = satisfies(service.version, `${compareMode}${targetVersion}`);
      if (isMatch) { return service; }
    }
  }


  return null;
};

const getConnector = () => {
  const hubConnector: KemuHubConnector = {
    getServices,
    getServiceContents,
    onParentEvent,
    // getInputs,
    // getOutputs,
    onCommand,
    getDefaultState,
    getCachedServices,
    areServicesCached,
    subscribeToServiceEvents,
    unsubscribeFromServiceEvents,
    callProcessorHandler,
    onBroadcastEvent,
    onSetOutputsEvent,
    executeHubFunction,
    initializeServiceInstance,
    terminateServiceInstance,
    getCompatibleService,
  };

  return hubConnector;
};


/**
 * Registers a callback function to be called when the specified event occurs.
 * @param event The event to listen for. Can be either 'connected' or 'disconnected'.
 * @param cb The callback function to be called when the event occurs.
 */
const on = (event:  ValidEvent, cb: VoidCallback ) => {
  _ee.on(event, cb);
};

/**
 * Removes a previously registered callback function from the specified event.
 * @param event The event to remove the callback from. Can be either 'connected' or 'disconnected'.
 * @param cb The original callback function used when registering the event.
 */
const off = (event:  ValidEvent, cb: VoidCallback ) => {
  _ee.off(event, cb);
};


export default {
  start,
  stop,
  on,
  off,
  isConnected,
  getConnector,
  isReady,
  getStatus,
  setCurrentRecipePoolId,
  reIssueWidgetsSubscriptions,
};
