import edgeIdFn from './edgeIdFn';
import { getAddDependencyEdgeFn } from './highlight-utils';
import { HighlightFunctionType } from './highlightFunctionType';

const highlightUpstream: HighlightFunctionType = (id, nodesMap, edgesList, config) => {
  const linkedIds = new Set<string>();
  const linkedEdges = new Set<string>();

  const addDependencyEdge = getAddDependencyEdgeFn(linkedIds, linkedEdges);

  const recurseUpstreamNodes = (currentId: string) => {
    if (linkedIds.has(currentId)) {
      return;
    }

    linkedIds.add(currentId);

    const currentNode = nodesMap[currentId];

    if (currentNode.upstreamNodes) {
      currentNode.upstreamNodes.forEach((upstream) => {
        const upstreamNode = nodesMap[upstream];

        if (!upstreamNode) {
          throw new Error(`Cannot find node with id '${upstream}' in nodes map.
          '${upstream}' was a member of the upstreamNodes array
          in '${currentNode.id}'`);
        }

        if (config?.edgeShouldBeSkipped && config.edgeShouldBeSkipped(upstreamNode, currentNode)) {
          return;
        }

        addDependencyEdge(upstreamNode, currentNode, config);

        if (currentNode.type === 'request_data_source') {
          // Request Data Source can produce cyclical dependencies
          // so we don't recurse down this path.
          return;
        }

        recurseUpstreamNodes(upstream);
      });
    }

    /**
     * If the current node is an ODFV, and it has
     * upstream dependencies...
     */
    if (currentNode.type === 'odfv' && currentNode.upstreamNodes) {
      const hasUpstreamFV =
        currentNode.upstreamNodes.filter((id) => {
          const node = nodesMap[id];

          return node.type === 'feature_view';
        }).length > 0;

      if (hasUpstreamFV) {
        /**
         * We assume that the ODFV is reading from the online store
         * so we add it to the list.
         */
        const hideOnlineStore = config && config.hideOnlineStore;

        if (!hideOnlineStore) {
          linkedEdges.add(edgeIdFn({ source: 'ONLINE_STORE', target: 'store-output' }));
          linkedIds.add('ONLINE_STORE');
        }

        /**
         * If any of its upstream FVs are materializing offline,
         * then we also show the line from the offline store
         * going out towards the ODFV.
         */
        const upstreamNodesHaveOfflineMaterialization = currentNode.upstreamNodes.find((upstreamId) => {
          const upstreamNode = nodesMap[upstreamId];

          return upstreamNode.type === 'feature_view' && upstreamNode.isOfflineMaterializationEnabled;
        });

        const hideOfflineStore = config && config.hideOfflineStore;

        if (upstreamNodesHaveOfflineMaterialization && !hideOfflineStore) {
          linkedIds.add('OFFLINE_STORE');
          linkedEdges.add(edgeIdFn({ source: 'OFFLINE_STORE', target: 'store-output' }));
        }
      }
    }
  };

  recurseUpstreamNodes(id);

  return {
    linkedIds,
    linkedEdges,
  };
};

export default highlightUpstream;
