import {
  DataSourceFCO,
  FCO,
  FCOType,
  FeatureServiceFCO,
  FeatureViewFCO,
  FeatureViewFCOFields,
  FeatureViewFCOType,
  StreamingDataSourceType,
  TemporalAggregateFeatureForFCO,
  WorkspaceFCOContainer,
} from '../../../../core/types/fcoTypes';
import { InputNode, PipelineNode, PipelineNodeTypes } from '../../../../service/Pipeline';
import {
  AggregationNode,
  DataFlowEdge,
  DataFlowNode,
  DataFlowPathProperties,
  DataFlowSpec,
  DataFlowTemplateType,
  DataSourceNode,
  EmbeddingModelNode,
  FeatureServiceNode,
  FeatureViewNode,
  ModelInferenceNode,
  ModelTrainerNode,
  NodesMapType,
  ODFVNode,
  RawBatchNode,
  RawStreamNode,
  RequestNode,
  TransformationNode,
} from './dataFlowTypes';
import { ODFVSchemaEntry, ODFVSchemaRow } from './DetailsPanel/detailsPanelQueryTypes';
import { getMaterializationConfig } from './helpers';
import edgeIdFn from './highlightLogic/edgeIdFn';
import { mergePathProperties } from './highlightLogic/highlight-utils';

const getRequestDataSourceFromODFV = (fv: FeatureViewFCO) => {
  if (!fv) {
    throw new Error(`fv is undefined. This should not happen...`);
  }

  const leafNodes = fv.pipeline?.allLeafNodes;

  if (leafNodes) {
    const result = leafNodes.find((n) => {
      if (n && n.nodeType === 'request_data_source_node') {
        return true;
      }
    });

    if (result) {
      return result.node.request_context as Record<string, unknown>;
    }
  }

  return undefined;
};

const getDependentFeatureViewIds = (fv: FeatureViewFCO, hashToId: Record<string, string>): string[] => {
  if (!fv) {
    throw new Error(`fv is undefined. This should not happen...`);
  }

  const results: string[] = [];

  if (fv.dependentFeatureViews) {
    fv.dependentFeatureViews.forEach((inputNode) => {
      const hash = inputNode.node.innerNodeId;

      if (!hash) {
        return;
      }

      const id = hashToId[hash];

      if (id !== null) {
        results.push(id);
      }
    });
  }

  return results;
};

const getFeatureViewDataflowPathProperties = ({
  sourceNodeType,
  targetNodeType,
  determiningFeatureView,
}: {
  sourceNodeType: DataFlowNode['type'];
  targetNodeType: DataFlowNode['type'];
  determiningFeatureView: FeatureViewFCO;
}): DataFlowPathProperties => {
  const result: DataFlowPathProperties = {};

  if (determiningFeatureView.featureViewType === 'stream') {
    if (sourceNodeType !== 'raw_stream_node' || targetNodeType !== 'online_store') {
      result.isBatchMaterializationPath = getMaterializationConfig(determiningFeatureView, 'offline');
    }
    if (sourceNodeType !== 'raw_batch_node' || targetNodeType !== 'offline_store') {
      result.isStreamMaterializationPath = getMaterializationConfig(determiningFeatureView, 'online');
    }
  }

  if (determiningFeatureView.featureViewType === 'batch') {
    if (sourceNodeType !== 'raw_stream_node') {
      result.isBatchMaterializationPath =
        getMaterializationConfig(determiningFeatureView, 'offline') ||
        getMaterializationConfig(determiningFeatureView, 'online');
    }
  }

  return result;
};

const collectSchemaFromRequestDataSources = (odfvId: string, idToFcoMap: Record<string, FCO>) => {
  const odfv = idToFcoMap[odfvId] as FeatureViewFCO;
  const rds = getRequestDataSourceFromODFV(odfv);

  const result: {
    id: string;
    name: string;
    schema: ODFVSchemaRow[] | undefined;
  } = {
    id: odfvId,
    name: odfv.name || odfvId,
    schema: undefined,
  };

  if (rds && rds.schema) {
    const schema = rds.schema as { fields: ODFVSchemaRow[] };

    if (!schema.fields) {
      throw new Error(`Missing fields in schema. ${JSON.stringify(odfv, null, 2)}`);
    }

    result.schema = schema.fields;
  }

  return result;
};

const collectFeatureServiceSchemaFromFVs = (idToFco: Record<string, FCO>, FVs: string[]) => {
  const fvSchemas = FVs.map((fvId) => {
    const fv = idToFco[fvId] as FeatureViewFCO;

    return {
      id: fvId,
      features: fv.features,
    };
  });

  return fvSchemas;
};

const idFormatter = (fco: FCO) => {
  if (fco.name) {
    return `${fco.fcoType}:${fco.name}`;
  } else {
    return fco.id;
  }
};

const buildMapWithNameAsKey = (map: Record<string, FCO>): Record<string, FCO> => {
  const result: Record<string, FCO> = {};

  Object.keys(map).forEach((hash) => {
    const fco = map[hash];
    const id = idFormatter(fco);

    if (fco.name) {
      result[id] = fco;
    } else {
      throw new Error(`FCO with id = '${id}' does not have a name. This should not happen...`);
    }
  });

  return result;
};

const buildHashToIdMap = (map: Record<string, FCO>): Record<string, string> => {
  const result: Record<string, string> = {};

  Object.keys(map).forEach((hash) => {
    const fco = map[hash];
    const id = idFormatter(fco);

    result[hash] = id;
  });

  return result;
};

const dataflowSpecSelector = (
  data: WorkspaceFCOContainer,
  workspace: string,
  anchor?: {
    id: string;
    type: FCO['fcoType'];
  }
): DataFlowSpec => {
  /**
   * Things to keep track of
   */

  const nodes: DataFlowNode[] = [];
  const edges: DataFlowEdge[] = [];
  const edgeRecord: Record<string, DataFlowEdge> = {};
  const nodesMap: NodesMapType = {};

  const hashToId = buildHashToIdMap(data.idToFcoMap);
  const idToFco = buildMapWithNameAsKey(data.idToFcoMap);

  /**
   * Helper Functions
   */

  const getDataSourceNode = (id: string): DataSourceNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const dsNode = nodesMap[id];

      if (dsNode.type === 'data_source') {
        return dsNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type DataSourceNode`);
      }
    }
  };

  const getFeatureViewNode = (id: string): FeatureViewNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const fvNode = nodesMap[id];

      if (fvNode.type === 'feature_view') {
        return fvNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type FeatureViewNode`);
      }
    }
  };

  const getODFVNode = (id: string): ODFVNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const odfvNode = nodesMap[id];

      if (odfvNode.type === 'odfv') {
        return odfvNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type ODFVNode`);
      }
    }
  };

  const getFeatureServiceNode = (id: string): FeatureServiceNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const fsNode = nodesMap[id];

      if (fsNode.type === 'feature_service') {
        return fsNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type FeatureServiceNode`);
      }
    }
  };

  const getTransformationNode = (id: string): TransformationNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const tfNode = nodesMap[id];

      if (tfNode.type === 'transformation') {
        return tfNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type TransformationNode`);
      }
    }
  };

  const getAggregationNode = (id: string): AggregationNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const aggNode = nodesMap[id];

      if (aggNode.type === 'aggregation') {
        return aggNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type AggregationNode`);
      }
    }
  };

  const getEmbeddingModelNode = (id: string): EmbeddingModelNode | undefined => {
    if (!nodesMap[id]) {
      return undefined;
    } else {
      const embeddingModelNode = nodesMap[id];

      if (embeddingModelNode.type === 'embeddingModel') {
        return embeddingModelNode;
      } else {
        throw new Error(`Node with id = '${id}' is not of type AggregationNode`);
      }
    }
  };

  const getDownstreamODFVs = (fvId: string): string[] => {
    const odfvs = data.featureViews.filter((fv) => {
      if (!fv.isRealtime) {
        return false;
      }

      const dependentFeatureViewIds = fv.dependentFeatureViews.map((inputNode: InputNode) => {
        if (!inputNode.node.innerNodeId) {
          return false;
        }

        const inputId = hashToId[inputNode.node.innerNodeId];

        return inputId;
      });

      return dependentFeatureViewIds.includes(fvId);
    });

    return odfvs.map((odfv) => {
      const odfvHash = odfv.id;
      const odfvId = hashToId[odfvHash];

      return odfvId;
    });
  };

  const addEdge = (
    source: string,
    target: string,
    pathProperties?: DataFlowPathProperties,
    animationAnchorId?: string
  ) => {
    const edgeId = edgeIdFn({ source, target });

    // if (source === 'STORE') {
    //   debugger;
    // }

    /**
     * Given a source and target pair, grab the edge in edgeRecord
     * corresponds to the edgeId, or create a new one.
     */
    const edge: DataFlowEdge = edgeRecord[edgeId] || {
      source,
      target,
      pathProperties: {},
      animationFlows: {},
    };
    /**
     * What's up with pathProperties vs. animationFlows?
     *
     * animationFlows has type: Record<string, DataFlowPathProperties>
     * which we use to record the animation this path *should* have,
     * given some distant node's configuration. e.g.
     *
     * The path between a stream source node, and a stream data source
     * should have the streaming animation IF some FV that uses the
     * data source has online materialization.
     *
     * Thus:
     *
     * animationFlow: {
     *   'stream_fv_id': {
     *     isStreamMaterialization: true
     *   }
     * }
     *
     * In this case 'stream_fv_id' is the animationAnchor, since it is
     * because of that node that this animation exists. Animation anchors
     * in Tecton are:
     *
     * - Feature Views, which determine whether materializations are on or off in paths
     *   leading from sources into the store.
     * - Feature Service, which determine what animations run from the store to the
     *   training/inference nodes on the right.
     *
     * Since an edge might be part of multiple dataflow paths (imagine
     * there being multiple FVs using the same stream data source), we
     * record all of the animations)
     *
     * animationFlow: {
     *   'stream_fv_1': { ... },
     *   'stream_fv_2': { ... },
     *   'stream_fv_3': { ... },
     * }
     *
     * Why record all of these? Right now, it is for debugging, and making
     * sure that all the animation we expect to happen actually happens.
     * Eventually, our highlighting logic should switch to using these
     * records as a source of truth. i.e. Should this animation be turned on
     * given this highlight mode? We can use these to answer that question.
     */

    if (edge.pathProperties === undefined) {
      throw new Error(`pathProperties is undefined for edgeId: ${edgeId}. This should not happen...`);
    }

    if (edge.animationFlows === undefined) {
      throw new Error(`animationFlows is undefined for edgeId: ${edgeId}. This should not happen...`);
    }

    if (animationAnchorId && pathProperties && Object.keys(pathProperties).length > 0) {
      const existingAnimationForAnchorID = edge.animationFlows[animationAnchorId] || {};
      const animationProperties = mergePathProperties(existingAnimationForAnchorID, pathProperties);

      // Record the pathProperties for a particular animationAnchor.
      if (Object.keys(animationProperties).length > 0) {
        edge.animationFlows[animationAnchorId] = animationProperties;
      }

      // Combined all animations
      const result = Object.keys(edge.animationFlows).reduce<DataFlowPathProperties>((memo, currentKey) => {
        if (edge.animationFlows === undefined) {
          throw new Error(`animationFlows is undefined for edgeId: ${edgeId}. This should not happen...`);
        }

        const current = edge.animationFlows[currentKey];

        return mergePathProperties(memo, current);
      }, {});

      edge.pathProperties = result;
    }

    if (!edgeRecord[edgeId]) {
      edges.push(edge);
      edgeRecord[edgeId] = edge;
    }
  };

  const addNode = <T extends DataFlowNode>(node: T): T => {
    if (!nodesMap[node.id]) {
      nodes.push(node);
      nodesMap[node.id] = node;

      return node;
    } else {
      throw new Error(`Adding a new node with an ID that conflicts with an existing node in nodesMap: ${node.id}`);
    }
  };

  /**
   * Node Processing Functions
   *
   * The following functions actually handles the processing of
   * nodes and edges. There are two classes: Add functions, and
   * Register functions:
   *
   * - addDataSource
   * - addFeatureView
   * - addODFV
   * - addFeatureService
   *
   * The "add" functions handles adding individual nodes,
   * and their auxilary nodes (e.g. inference/training nodes
   * associated with Feature Services). They also call out
   * to the "register" functions.
   *
   * - registerDownstreamFeatureViewOnDataSource
   * - registerDirectOfflineReadOnFeatureView
   * - registerDownstreamFeatureServiceOnFeatureView
   * - registerDownstreamODFVonFeatureView
   * - registerDownstreamFeatureServiceOnODFV
   *
   * The "register" functions handles most of the work of
   * adding edges, and deciding what animations are necessary.
   * Much of the gnarly logic of what needs to be connected
   * are handled in these functions.
   */

  const addDataSource = (dsId: string) => {
    const existingNode = getDataSourceNode(dsId);

    if (existingNode) {
      return existingNode;
    }
    const ds = idToFco[dsId] as DataSourceFCO;

    const upstreamNodes: string[] = [];

    /**
     * Create Stream Source
     */
    if (ds.dataSourceType === 'Stream' && ds.underlyingStreamDataSourceType) {
      // Only create a stream source if the data source type is 'Stream'
      // and the underlying stream source type is defined
      const streamSource: RawStreamNode = {
        id: dsId + '_stream_source',
        name: ds.name + ':' + ds.underlyingStreamDataSourceType,
        type: 'raw_stream_node',
        sourceType: ds.underlyingStreamDataSourceType,
        downstreamNodes: [dsId],
        dataSourceId: dsId,
      };

      upstreamNodes.push(streamSource.id);
      addNode<RawStreamNode>(streamSource);
      addEdge(streamSource.id, dsId);
    }

    /**
     * Create Push Source
     */
    if (ds.dataSourceType === 'Push') {
      // Only create a push source if the data source type is 'Push'
      const pushSource: RawStreamNode = {
        id: dsId + '_push_source',
        name: ds.name + ':Push Source',
        type: 'raw_stream_node',
        sourceType: StreamingDataSourceType.PUSH,
        downstreamNodes: [dsId],
        dataSourceId: dsId,
      };

      upstreamNodes.push(pushSource.id);
      addNode(pushSource);
      addEdge(pushSource.id, dsId);
    }

    /**
     * Create Batch Source Node
     */
    if (ds.underlyingBatchSourceDataType) {
      // A batch source node is created if the underlying batch source data type is defined,
      // the data source type doesn't actually matter, since all DS types can have a batch config.
      const batchSource: RawBatchNode = {
        id: dsId + '_batch_source',
        name: ds.name + ':' + ds.underlyingBatchSourceDataType,
        type: 'raw_batch_node',
        sourceType: ds.underlyingBatchSourceDataType,
        downstreamNodes: [dsId],
        dataSourceId: dsId,
      };

      upstreamNodes.push(batchSource.id);
      addNode(batchSource);

      addEdge(batchSource.id, dsId);
    }

    /**
     * Create Data Source Node
     */
    const dsNode: DataSourceNode = {
      id: dsId,
      name: ds.name || dsId,
      type: 'data_source',
      dataSourceType: ds.dataSourceType,
      upstreamNodes,
      downstreamNodes: ds.dependentFeatureViews.map((fvHash) => hashToId[fvHash]),
      description: ds.description || '',
    };

    return addNode(dsNode);
  };

  const registerDownstreamFeatureViewOnDataSource = (fvId: string, dsId: string) => {
    const fv = idToFco[fvId] as FeatureViewFCO;

    const dsNode = getDataSourceNode(dsId) || addDataSource(dsId);
    if (!dsNode.downstreamNodes?.includes(fvId)) {
      dsNode.downstreamNodes?.push(fvId);
    }

    dsNode.upstreamNodes?.forEach((upstreamNode) => {
      const upstreamNodeObject = nodesMap[upstreamNode];

      if (!upstreamNodeObject) {
        throw new Error(`Could not find node with id = '${upstreamNode}'`);
      }

      if (upstreamNodeObject.type === 'raw_stream_node' && getMaterializationConfig(fv, 'online')) {
        /**
         * If the upstream node is a stream source
         * or a push source, then we should add a stream animation
         * to the edge between the raw source and the data source.
         */
        addEdge(
          upstreamNode,
          dsId,
          {
            isStreamMaterializationPath: true,
          },
          fvId
        );
      }

      if (upstreamNodeObject.type === 'raw_batch_node') {
        /**
         * If the upstream node is a batch source,
         * then we should add a batch animation
         * to the edge between the batch source
         * and the data source.
         */
        if (fv.featureViewType !== 'batch' && getMaterializationConfig(fv, 'offline')) {
          /**
           * If the FV is not a batch FV, and it has offline materialization,
           * then we should add a batch animation to the edge.
           */
          addEdge(
            upstreamNode,
            dsId,
            {
              isBatchMaterializationPath: true,
            },
            fvId
          );
        }

        if (fv.featureViewType === 'batch') {
          /**
           * If the FV is a batch FV, then we should add a batch animation
           * if it either has offline materialization, or online materialization.
           */
          if (getMaterializationConfig(fv, 'offline') || getMaterializationConfig(fv, 'online')) {
            addEdge(
              upstreamNode,
              dsId,
              {
                isBatchMaterializationPath: true,
              },
              fvId
            );
          }
        }
      }
    });

    /**
     * Edge between the DS node and the downstream FV
     */
    const pathProperties: DataFlowEdge['pathProperties'] = {};

    if (
      getMaterializationConfig(fv, 'offline') ||
      (fv.featureViewType !== 'stream' && getMaterializationConfig(fv, 'online'))
    ) {
      pathProperties.isBatchMaterializationPath = true;
    }

    if (fv.featureViewType === 'stream' && getMaterializationConfig(fv, 'online')) {
      pathProperties.isStreamMaterializationPath = true;
    }

    addEdge(dsId, fvId, pathProperties, fvId);
  };

  const getAllReachableFeatureServicesFromFeatureView = (fvId: string): string[] => {
    const results: Set<string> = new Set<string>();

    const fv = idToFco[fvId] as FeatureViewFCO;

    fv.featureServices.forEach((fsHash) => {
      const fsId = hashToId[fsHash];

      results.add(fsId);
    });

    const downstreamODFVs = getDownstreamODFVs(fvId);
    downstreamODFVs.forEach((odfvId) => {
      const odfv = idToFco[odfvId] as FeatureViewFCO;

      odfv.featureServices.forEach((fsHash) => {
        const fsId = hashToId[fsHash];

        results.add(fsId);
      });
    });

    return [...results];
  };

  const registerDownstreamTransformationOnDataSource = (tfId: string, fvId: string, dsId: string) => {
    const fv = idToFco[fvId] as FeatureViewFCO;

    const reachableFeatureServices = getAllReachableFeatureServicesFromFeatureView(fvId);

    const dsNode = getDataSourceNode(dsId) || addDataSource(dsId);
    if (!dsNode.downstreamNodes?.includes(tfId)) {
      dsNode.downstreamNodes?.push(tfId);
    }

    dsNode.upstreamNodes?.forEach((upstreamNode) => {
      const upstreamNodeObject = nodesMap[upstreamNode];

      if (!upstreamNodeObject) {
        throw new Error(`Could not find node with id = '${upstreamNode}'`);
      }

      if (upstreamNodeObject.type === 'raw_stream_node' && getMaterializationConfig(fv, 'online')) {
        /**
         * If the upstream node is a stream source
         * or a push source, then we should add a stream animation
         * to the edge between the raw source and the data source.
         */
        addEdge(
          upstreamNode,
          dsId,
          {
            isStreamMaterializationPath: true,
          },
          fvId
        );
      }

      if (upstreamNodeObject.type === 'raw_batch_node') {
        /**
         * If the upstream node is a batch source,
         * then we should add a batch animation
         * to the edge between the batch source
         * and the data source.
         */

        const rawBatchToFVPathProperties: DataFlowEdge['pathProperties'] = {};

        if (getMaterializationConfig(fv, 'offline')) {
          rawBatchToFVPathProperties.isBatchMaterializationPath = true;
        }

        if (fv.featureViewType === 'batch' && getMaterializationConfig(fv, 'online')) {
          /**
           * If the FV is a batch FV, then we should add a batch animation
           * if it either has offline materialization, or online materialization.
           */
          rawBatchToFVPathProperties.isBatchMaterializationPath = true;
        }

        addEdge(upstreamNode, dsId, rawBatchToFVPathProperties, fvId);

        if (reachableFeatureServices.length > 0 && !getMaterializationConfig(fv, 'offline')) {
          /**
           * If the FV does not have offline materialization, but it does have
           * downstream Feature Services, then for each downstream feature service
           * we need to register a read from the FS.
           */
          reachableFeatureServices.forEach((fsId) => {
            addEdge(upstreamNode, dsId, { isOfflineReadPath: true }, fsId);
          });
        }
      }
    });
  };

  const addRDSNodeInFVTemplate = (rdsId: string, name: string, schemaCount: number) => {
    const existingNode = nodesMap[rdsId];

    if (existingNode) {
      return existingNode;
    }

    const rdsNode: RequestNode = {
      id: rdsId,
      name: name,
      type: 'request_data_source',
      upstreamNodes: [],
      downstreamNodes: [],
      schemaCount,
    };

    return addNode(rdsNode);
  };

  const addTransformation = (tfId: string) => {
    const existingNode = getTransformationNode(tfId);

    if (existingNode) {
      return existingNode;
    }

    const tf = idToFco[tfId] as DataSourceFCO;

    const upstreamNodes: string[] = [];
    const downstreamNodes: string[] = [];

    const tfNode: TransformationNode = {
      id: tfId,
      name: tf.name || tfId,
      type: 'transformation',
      upstreamNodes,
      downstreamNodes,
      description: tf.description || '',
    };

    return addNode(tfNode);
  };

  const addAggregation = (agg: TemporalAggregateFeatureForFCO, anchorNodeId: string) => {
    const existingNode = getAggregationNode(agg.name);

    if (existingNode) {
      return existingNode;
    }

    const aggNode: AggregationNode = {
      id: agg.name,
      name: agg.name,
      type: 'aggregation',
      upstreamNodes: ['STORE'],
      downstreamNodes: [anchorNodeId],
    };

    return addNode(aggNode);
  };

  const addEmbeddingsModel = (
    embeddingModel: any,
    anchorNodeId: string,
    rootTransformationNode: TransformationNode
  ) => {
    const existingNode = getEmbeddingModelNode(embeddingModel.name);

    if (existingNode) {
      return existingNode;
    }

    const embeddingModelNode: EmbeddingModelNode = {
      id: embeddingModel.name,
      name: embeddingModel.name,
      type: 'embeddingModel',
      isCustomModel: !!embeddingModel.isCustomModel,
      upstreamNodes: ['STORE'],
      downstreamNodes: [anchorNodeId],
      inputColumns: embeddingModel.inputColumns,
    };

    const rootNode = rootTransformationNode?.id ? rootTransformationNode?.id : anchorNodeId;
    const edgePathProperties = {
      isBatchMaterializationPath: true,
    };
    addEdge(rootNode, embeddingModel.name, edgePathProperties, anchorNodeId);
    addEdge(embeddingModel.name, 'STORE', edgePathProperties, anchorNodeId);
    return addNode(embeddingModelNode);
  };

  /**
   * Unpacks the pipeline recursively, and adds nodes and edges.
   * Note that this might be run multiple times if there are multiple
   * dataflow sources, e.g. multiple feature services.
   */
  const unpackPipeline = (
    rootNode: PipelineNode,
    dataflowSource: {
      nodeId: string;
      dataflowPathProperties: DataFlowPathProperties;
    }
  ) => {
    const { nodeId, dataflowPathProperties } = dataflowSource;

    const traceTransformationNode = (node: PipelineNode) => {
      if (!node.innerNodeId) {
        throw new Error(`node.innerNodeId is undefined. This should not happen...`);
      }

      const tfId = hashToId[node.innerNodeId];

      const currentTransformationNode = addTransformation(tfId);

      node.inputs.forEach((input: InputNode) => {
        if (input.node.nodeType === PipelineNodeTypes.REQUEST_DATA_SOURCE) {
          if (!input.node.node) {
            throw new Error(`input.node.node is undefined. This should not happen...`);
          }

          const { input_name } = input.node.node;
          const rdsId = `rds:${input_name}`;

          const schemaCount = input.node.node.request_context.schema.fields.length;

          const rdsNode = addRDSNodeInFVTemplate(rdsId, input_name, schemaCount);
          rdsNode.downstreamNodes?.push(tfId);

          addEdge(rdsId, tfId, dataflowPathProperties, nodeId);
        }

        if (!input.node.innerNodeId) {
          return;
        }

        const inputId = hashToId[input.node.innerNodeId];

        // Handle Transformation
        if (input.node.nodeType === PipelineNodeTypes.TRANSFORMATION) {
          currentTransformationNode.upstreamNodes?.push(inputId);

          const inputTransformationNode = traceTransformationNode(input.node);
          inputTransformationNode.downstreamNodes?.push(tfId);

          addEdge(inputId, tfId, dataflowPathProperties, nodeId);
        }

        // Handle Feature View
        if (input.node.nodeType === PipelineNodeTypes.FEATURE_VIEW) {
          // If it is a Feature View Feeding into
          // our transformation node, then we are in an ODFV (Realtime Featureview)

          currentTransformationNode.upstreamNodes?.push(inputId);

          const inputFeatureViewFCO = idToFco[inputId] as FeatureViewFCO;
          const inputFeatureViewNode = addFeatureView(inputId);

          inputFeatureViewNode.downstreamNodes?.push(tfId);

          const inputFeatureViewPathProperties = getFeatureViewDataflowPathProperties({
            sourceNodeType: 'feature_view',
            targetNodeType: 'transformation',
            determiningFeatureView: inputFeatureViewFCO,
          });

          if (!getMaterializationConfig(inputFeatureViewFCO, 'offline') && dataflowPathProperties.isOfflineReadPath) {
            // If the upstream input FV does not write to the offline store,
            // then the downstream transformation node will read from
            // the input FV directly (instead of the store)
            addEdge(inputId, tfId, { isOfflineReadPath: true }, nodeId);
          }

          const upstreamWritesToOnlineStore = getMaterializationConfig(inputFeatureViewFCO, 'online');
          const upstreamWritesToOfflineStore = getMaterializationConfig(inputFeatureViewFCO, 'offline');

          if (upstreamWritesToOnlineStore || upstreamWritesToOfflineStore) {
            // Otherwise, add the store edges
            addEdge(inputId, 'STORE', inputFeatureViewPathProperties, inputId);

            // If the upstream writes to the online store, then we need to
            // add the store edges
            if (upstreamWritesToOnlineStore) {
              const onlineStorePathProperties: DataFlowEdge['pathProperties'] = {};

              if (inputFeatureViewFCO.featureViewType === 'stream') {
                onlineStorePathProperties.isStreamMaterializationPath = true;
              } else {
                onlineStorePathProperties.isBatchMaterializationPath = true;
              }

              addEdge('store-input', 'ONLINE_STORE', onlineStorePathProperties, inputId);
              addEdge(
                'ONLINE_STORE',
                'store-output',
                {
                  isOnlineServingPath: dataflowPathProperties.isOnlineServingPath,
                },
                nodeId
              );
            }

            // If the upstream writes to the offline store
            if (upstreamWritesToOfflineStore) {
              addEdge(
                'store-input',
                'OFFLINE_STORE',
                {
                  isBatchMaterializationPath: true,
                },
                inputId
              );

              addEdge(
                'OFFLINE_STORE',
                'store-output',
                {
                  isOfflineReadPath: dataflowPathProperties.isOfflineReadPath,
                },
                nodeId
              );
            }

            // Add the edge from the store to the transformation in the ODFV
            addEdge(
              'STORE',
              tfId,
              {
                isOnlineServingPath: dataflowPathProperties.isOnlineServingPath,
                isOfflineReadPath: upstreamWritesToOfflineStore,
              },
              nodeId
            );
          } else {
            // There is no online/offline nodes so we connect FV to the transformation data source inside the Realtime FV
            if (tfId && inputId) {
              // Only Realtime FV
              addEdge(tfId, `${inputId}`);
            }
          }

          inputFeatureViewFCO.dataSourceIds.forEach((dsHash) => {
            const dsId = hashToId[dsHash];

            registerDownstreamTransformationOnDataSource(tfId, inputId, dsId);

            if (!getMaterializationConfig(inputFeatureViewFCO, 'offline')) {
              const allReachableFeatureServices = getAllReachableFeatureServicesFromFeatureView(inputId);

              allReachableFeatureServices.forEach((fsId) => {
                addEdge(dsId, inputId, { isOfflineReadPath: true }, fsId);
              });
            }
          });
        }

        // Handle Data Source
        if (input.node.nodeType === PipelineNodeTypes.DATA_SOURCE) {
          const currentFeatureViewFCO = idToFco[nodeId] as FeatureViewFCO;
          const currentFeatureViewPathProperties = getFeatureViewDataflowPathProperties({
            sourceNodeType: 'data_source',
            targetNodeType: 'transformation',
            determiningFeatureView: currentFeatureViewFCO,
          });

          currentTransformationNode.upstreamNodes?.push(inputId);

          const inputDataSourceNode = addDataSource(inputId);
          inputDataSourceNode.downstreamNodes?.push(tfId);

          registerDownstreamTransformationOnDataSource(tfId, nodeId, inputId);

          addEdge(inputId, tfId, currentFeatureViewPathProperties, nodeId);

          if (!getMaterializationConfig(currentFeatureViewFCO, 'offline')) {
            // If the upstream input FV does not write to the offline store,
            // then the downstream transformation node will read from
            // the input FV directly (instead of the store)

            currentFeatureViewFCO.featureServices.forEach((fsHash) => {
              const fsId = hashToId[fsHash];

              addEdge(inputId, tfId, { isOfflineReadPath: true }, fsId);
            });

            getDownstreamODFVs(nodeId).forEach((odfvId) => {
              const odfvFCO = idToFco[odfvId] as FeatureViewFCO;

              odfvFCO.featureServices.forEach((fsHash) => {
                const fsId = hashToId[fsHash];

                addEdge(inputId, tfId, { isOfflineReadPath: true }, fsId);
              });
            });
          }
        }
      });

      return currentTransformationNode;
    };

    return traceTransformationNode(rootNode);
  };

  const addJoinInputsEdge = (
    realTimeFvId: string,
    pipelineNode: (PipelineNode | undefined)[],
    idToFcoMap: Record<string, FCO | FeatureViewFCO>
  ) => {
    pipelineNode?.forEach((node: PipelineNode | undefined) => {
      // Handle Request Data Source
      if (node?.nodeType === PipelineNodeTypes.REQUEST_DATA_SOURCE) {
        const { input_name } = node.node;
        const requestDataSourceId = `rds:${input_name}`;
        const schemaCount = node.node.request_context.schema.fields.length;
        const rdsNode = addRDSNodeInFVTemplate(requestDataSourceId, input_name, schemaCount);
        rdsNode.downstreamNodes?.push(realTimeFvId);
        addEdge(requestDataSourceId, realTimeFvId);
      }

      // Handle Feature View
      if (node?.nodeType === PipelineNodeTypes.FEATURE_VIEW) {
        if (node?.innerNodeId) {
          const fcoId = node?.innerNodeId;
          const fco = idToFcoMap[fcoId];
          addFeatureView(`featureView:${fco.name}`);
          addEdge(`featureView:${fco.name}`, realTimeFvId);
        }
      }
    });
  };

  const addFeatureView = (
    fvId: string,
    options?: {
      isAnchor?: boolean;
    }
  ) => {
    const existingNode = getFeatureViewNode(fvId);

    if (existingNode) {
      return existingNode;
    }
    const upstreamNodes: string[] = [];
    const downstreamNodes: string[] = [];

    const fv = idToFco[fvId] as FeatureViewFCO;

    if (!options?.isAnchor) {
      fv.dataSourceIds.forEach((dsHash) => {
        const dsId = hashToId[dsHash];
        upstreamNodes.push(dsId);

        registerDownstreamFeatureViewOnDataSource(fvId, dsId);
      });
    }

    const hasMaterialization = getMaterializationConfig(fv, 'online') || getMaterializationConfig(fv, 'offline');
    /**
     * Should the FV connect to the store?
     *
     * It should if online = true OR offline = true
     *
     * ... except when the FV is the anchor. Then we
     * render the store INSIDE the FV wrapper, and thus
     * we don't need to connect it.
     */
    if (hasMaterialization && !options?.isAnchor) {
      const edgeHasBatchAnimation =
        fv.featureViewType !== 'stream'
          ? getMaterializationConfig(fv, 'offline') || getMaterializationConfig(fv, 'online')
          : getMaterializationConfig(fv, 'offline');

      const edgeHasStreamAnimation = fv.featureViewType === 'stream' && getMaterializationConfig(fv, 'online');

      addEdge(
        fvId,
        'STORE',
        {
          isBatchMaterializationPath: edgeHasBatchAnimation,
          isStreamMaterializationPath: edgeHasStreamAnimation,
        },
        fvId
      );
    }

    if (getMaterializationConfig(fv, 'online')) {
      const inputToStoreAnimations: DataFlowEdge['pathProperties'] = {};

      if (fv.featureViewType === 'stream') {
        inputToStoreAnimations.isStreamMaterializationPath = true;
      } else {
        inputToStoreAnimations.isBatchMaterializationPath = true;
      }

      addEdge('store-input', 'ONLINE_STORE', inputToStoreAnimations, fvId);
    }

    if (getMaterializationConfig(fv, 'offline')) {
      addEdge(
        'store-input',
        'OFFLINE_STORE',
        {
          isBatchMaterializationPath: true,
        },
        fvId
      );
    }

    // If the FV node is an anchor
    // then we need to parse the pipeline
    // and add the transformations and
    // aggregations as nodes.
    if (options?.isAnchor) {
      const allReachableFeatureServices = getAllReachableFeatureServicesFromFeatureView(fvId);

      const addEdgesThatConnectStoreToAnchorFV = () => {
        if (fv.temporalAggregateFeatures?.length > 0) {
          // If there are temporal aggregates
          // then they go between the store
          // and the anchor FV node
          fv.temporalAggregateFeatures.forEach((agg) => {
            addAggregation(agg, fvId);

            allReachableFeatureServices.forEach((fsId) => {
              const fsFco = idToFco[fsId] as FeatureServiceFCO;

              addEdge(
                'STORE',
                agg.name,
                {
                  isOnlineServingPath: fsFco.isOnlineServingEnabled,
                  isOfflineReadPath: getMaterializationConfig(fv, 'offline'),
                },
                fsId
              );

              if (!getMaterializationConfig(fv, 'offline')) {
                addEdge(rootTransformationNode.id, agg.name, { isOfflineReadPath: true }, fsId);
              }

              addEdge(
                agg.name,
                fvId,
                {
                  isOnlineServingPath: fsFco.isOnlineServingEnabled,
                  isOfflineReadPath: true,
                },
                fsId
              );
            });

            if (allReachableFeatureServices.length === 0) {
              addEdge('STORE', agg.name);
              addEdge(agg.name, fvId);
            }
          });
        } else {
          // If there are no temporal aggregates
          // then the store runs directly into
          // the anchor FV node

          allReachableFeatureServices.forEach((fsId) => {
            const fsFco = idToFco[fsId] as FeatureServiceFCO;

            if (!getMaterializationConfig(fv, 'offline')) {
              addEdge(
                rootTransformationNode.id,
                fvId,
                {
                  isOfflineReadPath: true,
                },
                fsId
              );
            }

            addEdge(
              'STORE',
              fvId,
              {
                isOnlineServingPath: fsFco.isOnlineServingEnabled,
                isOfflineReadPath: getMaterializationConfig(fv, 'offline'),
              },
              fsId
            );
          });

          // Even if there are no feature services downstream,
          // we still need to add an edge from the store to the
          // anchor FV node.
          if (allReachableFeatureServices.length === 0) {
            addEdge('STORE', fvId);
          }
        }
      };

      let rootTransformationNode: TransformationNode;

      if (fv.pipeline?.root) {
        const transformationPathProperties: DataFlowPathProperties = getFeatureViewDataflowPathProperties({
          sourceNodeType: 'data_source',
          targetNodeType: 'transformation',
          determiningFeatureView: fv,
        });

        rootTransformationNode = unpackPipeline(fv.pipeline.root, {
          nodeId: fvId,
          dataflowPathProperties: transformationPathProperties,
        });

        // Add upstream and downstream references
        upstreamNodes.push(rootTransformationNode.id);
        rootTransformationNode.downstreamNodes?.push(fvId);

        /**
         ** IS FV AN EMBEDDING FV
         */
        if (fv?.isEmbedding && fv?.embeddings?.length) {
          let models = [
            ...new Set(
              fv.embeddings
                .map((e) => {
                  return { name: e?.model || '', inputColumns: e?.input_column_name ?? '' };
                })
                .filter(Boolean)
            ),
          ];
          if (fv?.inferences?.length) {
            const customModels = fv.inferences
              .map((i) => {
                return {
                  name: i?.model_artifact?.name || '',
                  isCustomModel: true,
                  inputColumns: i?.input_columns?.map((column) => column.name).join(`,`) ?? '',
                };
              })
              .filter(Boolean);

            models = [...new Set([...models, ...customModels])];
          }

          models.forEach((model) => addEmbeddingsModel(model, fvId, rootTransformationNode));
        } else {
          // The root transformation node is
          // upstream of the store
          addEdge(rootTransformationNode.id, 'STORE', transformationPathProperties, fvId);
        }

        addEdgesThatConnectStoreToAnchorFV();
      } else {
        throw new Error(`Feature View with id = '${fvId}' does not have a pipeline root. This should not happen...`);
      }

      // We also need to trace Feature Services downstream.
      fv.featureServices.forEach((fsHash) => {
        const fsId = hashToId[fsHash];
        const fs = idToFco[fsId] as FeatureServiceFCO;

        addFeatureService(fsId, { doNotTraceUpstream: true });

        downstreamNodes.push(fsId);

        if (fs.isOnlineServingEnabled) {
          addEdge(
            'ONLINE_STORE',
            'store-output',
            {
              isOnlineServingPath: fs.isOnlineServingEnabled,
            },
            fsId
          );
        }

        if (getMaterializationConfig(fv, 'offline')) {
          addEdge(
            'OFFLINE_STORE',
            'store-output',
            {
              isOfflineReadPath: true,
            },
            fsId
          );
        }

        addEdge(
          fvId,
          fsId,
          {
            isOnlineServingPath: fs.isOnlineServingEnabled,
            isOfflineReadPath: true,
          },
          fsId
        );
      });

      // And ODFV too
      const downstreamODFVs = getDownstreamODFVs(fvId);

      downstreamODFVs.forEach((odfvId) => {
        addODFV(odfvId, { doNotTraceUpstream: true });

        downstreamNodes.push(odfvId);

        const odfv = idToFco[odfvId] as FeatureViewFCO;

        if (odfv.featureServices.length === 0) {
          addEdge(fvId, odfvId);
        }

        odfv.featureServices.forEach((fsHash) => {
          const fsId = hashToId[fsHash];
          const fs = idToFco[fsId] as FeatureServiceFCO;

          const fsDataflowProperties: DataFlowPathProperties = {
            isOnlineServingPath: fs.isOnlineServingEnabled,
            isOfflineReadPath: true,
          };

          addFeatureService(fsId, {
            doNotTraceUpstream: true,
          });

          addEdge(
            'STORE',
            fvId,
            {
              isOnlineServingPath: fs.isOnlineServingEnabled,
              isOfflineReadPath: getMaterializationConfig(fv, 'offline'),
            },
            fsId
          );

          if (fsDataflowProperties.isOnlineServingPath) {
            addEdge('ONLINE_STORE', 'store-output', { isOnlineServingPath: true }, fsId);
          }

          if (getMaterializationConfig(fv, 'offline')) {
            addEdge('OFFLINE_STORE', 'store-output', { isOfflineReadPath: true }, fsId);
          }

          addEdge(fvId, odfvId, fsDataflowProperties, fsId);
          addEdge(odfvId, fsId, fsDataflowProperties, fsId);

          if (rootTransformationNode && !getMaterializationConfig(fv, 'offline')) {
            // If the FV does not write to the offline store,
            // then the downstream FV node will read from
            // the root transformation node directly (instead of the store)
            addEdge(rootTransformationNode.id, fvId, { isOfflineReadPath: true }, fsId);
          }
        });
      });
    }

    const modelCount = Object.keys(fv[FeatureViewFCOFields.MODEL_MAP] ?? {}).length;
    const featuresCount = fv[FeatureViewFCOFields.FEATURES].length - modelCount;

    const fvNode: FeatureViewNode = {
      id: fvId,
      name: fv.name || fvId,
      description: fv.description || '',
      type: 'feature_view',
      featureViewType: fv.featureViewType || FeatureViewFCOType.UNKNOWN,
      isOnlineMaterializationEnabled: getMaterializationConfig(fv, 'online'),
      isOfflineMaterializationEnabled: getMaterializationConfig(fv, 'offline'),
      schemaCount: fv.features.length,
      upstreamNodes,
      downstreamNodes,
      featuresCount,
      modelCount: modelCount,
    };

    if (options?.isAnchor) {
      fvNode.isAnchor = true;
    }

    return addNode(fvNode);
  };

  const registerDirectOfflineReadOnFeatureView = (fvId: string, fsId: string, odfvId?: string) => {
    if (!getFeatureViewNode(fvId)) {
      addFeatureView(fvId);
    }

    const fv = idToFco[fvId] as FeatureViewFCO;
    const isOfflineRead: DataFlowPathProperties = {
      isOfflineReadPath: true,
    };

    if (odfvId) {
      addEdge(fvId, odfvId, isOfflineRead, fsId);

      addEdge(odfvId, fsId, isOfflineRead, fsId);
    } else {
      addEdge(fvId, fsId, isOfflineRead, fsId);
    }

    fv.dataSourceIds.forEach((dsHash) => {
      const dsId = hashToId[dsHash];

      if (!getDataSourceNode(dsId)) {
        addDataSource(dsId);
      }

      addEdge(dsId + '_batch_source', dsId, isOfflineRead, fsId);

      addEdge(dsId, fvId, isOfflineRead, fsId);
    });
  };

  const registerDownstreamFeatureServiceOnFeatureView = (fsId: string, fvId: string) => {
    const fvNode = getFeatureViewNode(fvId) || addFeatureView(fvId);
    const fv = idToFco[fvId] as FeatureViewFCO;
    const fs = idToFco[fsId] as FeatureServiceFCO;

    if (!fvNode.downstreamNodes?.includes(fsId)) {
      fvNode.downstreamNodes?.push(fsId);
    }

    if (!getMaterializationConfig(fv, 'offline')) {
      registerDirectOfflineReadOnFeatureView(fvId, fsId);
    } else {
      addEdge(
        'OFFLINE_STORE',
        'store-output',
        {
          isOfflineReadPath: true,
        },
        fsId
      );

      addEdge(
        'STORE',
        fsId,
        {
          isOfflineReadPath: true,
        },
        fsId
      );
    }

    if (fs.isOnlineServingEnabled) {
      addEdge(
        'ONLINE_STORE',
        'store-output',
        {
          isOnlineServingPath: true,
        },
        fsId
      );

      addEdge(
        'STORE',
        fsId,
        {
          isOnlineServingPath: true,
        },
        fsId
      );
    }
  };

  const registerDownstreamODFVonFeatureView = (odfvId: string, fvId: string) => {
    const fvNode = getFeatureViewNode(fvId) || addFeatureView(fvId);

    const inputFV = idToFco[fvId] as FeatureViewFCO;

    // This node has an upstream dependency
    // on another FV
    if (getMaterializationConfig(inputFV, 'online')) {
      /**
       * Note: at this point, we don't know if there's readers yet
       * so we merely connect the online store to the ODFV
       * but don't yet add animations.
       */
      addEdge('ONLINE_STORE', 'store-output');
      addEdge('STORE', odfvId);
    }

    if (!getMaterializationConfig(inputFV, 'offline')) {
      /**
       * If Offline Materialization is DISABLED
       * we connect a line from the FV to the ODFV
       * directly, since model training is reading
       * directly from the Data Source via the input
       * Feature view.
       */
      addEdge(fvId, odfvId);
    } else {
      addEdge('OFFLINE_STORE', 'store-output');
      addEdge('STORE', odfvId);
    }

    if (!fvNode.downstreamNodes?.includes(odfvId)) {
      fvNode.downstreamNodes?.push(odfvId);
    }
  };

  const addODFV = (odfvId: string, options?: { isAnchor?: boolean; doNotTraceUpstream?: boolean }) => {
    const existingNode = getODFVNode(odfvId);

    if (existingNode) {
      return existingNode;
    }
    const odfv = idToFco[odfvId] as FeatureViewFCO;

    const upstreamNodes: string[] = [];

    const inputFVlist = getDependentFeatureViewIds(odfv, hashToId);

    if (!options?.isAnchor && !options?.doNotTraceUpstream) {
      inputFVlist.forEach((inputFvId) => {
        upstreamNodes.push(inputFvId);

        registerDownstreamODFVonFeatureView(odfvId, inputFvId);
      });
    }

    if (options?.isAnchor) {
      let someInputWritesToOfflineStore = false;
      let someInputWritesToOnlineStore = false;

      inputFVlist.forEach((inputFvId) => {
        const inputFV = idToFco[inputFvId] as FeatureViewFCO;

        if (getMaterializationConfig(inputFV, 'offline')) {
          someInputWritesToOfflineStore = true;
        }

        if (getMaterializationConfig(inputFV, 'online')) {
          someInputWritesToOnlineStore = true;
        }
      });

      odfv.featureServices.forEach((fsHash) => {
        const fsId = hashToId[fsHash];
        const fs = idToFco[fsId] as FeatureServiceFCO;

        // Dataflow Properties for this Feature Service
        const fsDataflowProperties: DataFlowPathProperties = {
          isOnlineServingPath: fs.isOnlineServingEnabled,
          isOfflineReadPath: true,
        };

        addFeatureService(fsId, {
          doNotTraceUpstream: true,
        });

        addEdge(odfvId, fsId, fsDataflowProperties, fsId);

        if (odfv.pipeline?.root && odfv.pipeline?.root.nodeType !== PipelineNodeTypes.JOIN_INPUTS) {
          // Trace through the whole transformation graph
          const rootTransformationNode = unpackPipeline(odfv.pipeline.root, {
            nodeId: fsId,
            dataflowPathProperties: fsDataflowProperties,
          });
          upstreamNodes.push(rootTransformationNode.id);
          rootTransformationNode.downstreamNodes?.push(odfvId);

          // Feature Service Reading from the store
          if (someInputWritesToOfflineStore || someInputWritesToOnlineStore) {
            addEdge(
              'STORE',
              rootTransformationNode.id,
              {
                isOnlineServingPath: someInputWritesToOnlineStore,
                isOfflineReadPath: someInputWritesToOfflineStore,
              },
              fsId
            );
          }

          // Reading from the online store
          if (someInputWritesToOnlineStore) {
            addEdge(
              'ONLINE_STORE',
              'store-output',
              {
                isOnlineServingPath: fsDataflowProperties.isOnlineServingPath,
              },
              fsId
            );
          }

          // Reading from the offline store
          if (someInputWritesToOfflineStore) {
            addEdge(
              'OFFLINE_STORE',
              'store-output',
              {
                isOfflineReadPath: fsDataflowProperties.isOfflineReadPath,
              },
              fsId
            );
          }

          // From the root transformation node to the ODFV node
          addEdge(rootTransformationNode.id, odfvId, fsDataflowProperties, fsId);
        }
      });

      if (odfv.featureServices.length === 0) {
        if (odfv.pipeline?.root.nodeType == PipelineNodeTypes.JOIN_INPUTS) {
          // This is handled differently since Realtime FV with Calculations doesn't have a transformation_node
          addJoinInputsEdge(odfvId, odfv?.pipeline?._getAllNodesInDAG ?? [], data.idToFcoMap);
        } else if (odfv.pipeline?.root) {
          const rootTransformationNode = unpackPipeline(odfv.pipeline.root, {
            nodeId: odfvId,
            dataflowPathProperties: {},
          });
          upstreamNodes.push(rootTransformationNode.id);
          rootTransformationNode.downstreamNodes?.push(odfvId);

          addEdge(rootTransformationNode.id, odfvId);
        }
      }
    }

    const odfvNode: ODFVNode = {
      id: odfvId,
      name: odfv.name || odfvId,
      description: odfv.description || '',
      type: 'odfv',
      schemaCount: odfv.features.length,
      upstreamNodes,
      downstreamNodes: odfv.featureServices.map((fsHash) => hashToId[fsHash]),
    };

    if (options?.isAnchor) {
      odfvNode.isAnchor = true;
    }

    return addNode<ODFVNode>(odfvNode);
  };

  const registerDownstreamFeatureServiceOnODFV = (fsId: string, odfvId: string) => {
    const odfvNode = getODFVNode(odfvId) || addODFV(odfvId);

    const fs = idToFco[fsId] as FeatureServiceFCO;
    const odfv = idToFco[odfvId] as FeatureViewFCO;

    if (!odfvNode.downstreamNodes?.includes(fsId)) {
      odfvNode.downstreamNodes?.push(fsId);
    }

    addEdge(
      odfvId,
      fsId,
      {
        isOnlineServingPath: fs.isOnlineServingEnabled,
        isOfflineReadPath: true,
      },
      fsId
    );

    const requestDataSource = getRequestDataSourceFromODFV(odfv);
    if (requestDataSource) {
      const requestDataSourceID = 'REQ_' + fsId;

      odfvNode.upstreamNodes
        ? odfvNode.upstreamNodes.push(requestDataSourceID)
        : (odfvNode.upstreamNodes = [requestDataSourceID]);
    }

    const inputFVlist = getDependentFeatureViewIds(odfv, hashToId);

    if (inputFVlist.length > 0) {
      /**
       * If this ODFV have some upstream input FVs,
       * then we need to figure out whether there
       * should be animation coming out from the store
       */

      const upstreamFVs = inputFVlist.map((inputFVid) => {
        return idToFco[inputFVid] as FeatureViewFCO;
      });

      /**
       * Handling the Offline Case: Part 1
       * If any upstream FV writes to the offline store,
       * then add a offline read path from the store
       * to the ODFV
       */
      const upstreamFVMaterializingOffline = upstreamFVs.filter((inputFV) => {
        return getMaterializationConfig(inputFV, 'offline');
      });

      if (upstreamFVMaterializingOffline.length > 0) {
        addEdge(
          'OFFLINE_STORE',
          'store-output',
          {
            isOfflineReadPath: true,
          },
          fsId
        );

        addEdge(
          'STORE',
          odfvId,
          {
            isOfflineReadPath: true,
          },
          fsId
        );
      }

      /**
       * Handling the Offline Case: Part 2
       * If any upstream FV has offline materialization disabled,
       * then register a direct offline read on that feature view
       */
      upstreamFVs
        .filter((inputFV) => {
          return !getMaterializationConfig(inputFV, 'offline');
        })
        .forEach((inputFv) => {
          const inputFvId = idFormatter(inputFv);

          registerDirectOfflineReadOnFeatureView(inputFvId, fsId, odfvId);
        });

      /**
       * Handling the online cases
       */
      if (fs.isOnlineServingEnabled) {
        /**
         * If the Feature Service has online serving enabled
         * then we should connect from the online store
         * to the Feature Service
         */

        const isOnlineServing: DataFlowPathProperties = {
          isOnlineServingPath: true,
        };

        const upstreamFVMaterializingOnline = upstreamFVs.filter((inputFV) => {
          return getMaterializationConfig(inputFV, 'online');
        });

        if (upstreamFVMaterializingOnline.length > 0) {
          /**
           * The online serving animation only lights up, however
           * if something upstream is actually writing
           * to the online store.
           */

          addEdge('ONLINE_STORE', 'store-output', isOnlineServing, fsId);
          addEdge('STORE', odfvId, isOnlineServing, fsId);
        }

        /**
         * ... otherwise, online serving goes directly to the
         * Feature View? TODO: Check if this actually works this way
         */
        const upstreamFVthatAreNOTMaterializingOnline = upstreamFVs.filter((inputFV) => {
          return !getMaterializationConfig(inputFV, 'online');
        });

        upstreamFVthatAreNOTMaterializingOnline.forEach((inputFv) => {
          const inputFvId = idFormatter(inputFv);

          addEdge(inputFvId, odfvId, isOnlineServing, fsId);
        });
      }
    }
  };

  const addFeatureService = (
    fsId: string,
    options?: {
      isAnchor?: boolean;
      doNotTraceUpstream?: boolean;
    }
  ) => {
    const existingNode = getFeatureServiceNode(fsId);

    if (existingNode) {
      return existingNode;
    }

    const fs = idToFco[fsId] as FeatureServiceFCO;

    const odfvWithRequestDataSource: string[] = [];
    const requestDataSourceSchemas: Record<string, unknown>[] = [];
    const upstreamNodes: string[] = [];

    if (!options?.doNotTraceUpstream) {
      fs.allFeatureViews.forEach((fvHash) => {
        const fvId = hashToId[fvHash];

        const fv = idToFco[fvId] as FeatureViewFCO;

        upstreamNodes.push(fvId);

        if (fv.isRealtime) {
          const requestDataSource = getRequestDataSourceFromODFV(fv);

          if (requestDataSource) {
            odfvWithRequestDataSource.push(fvId);
            requestDataSourceSchemas.push(requestDataSource);
          }

          registerDownstreamFeatureServiceOnODFV(fsId, fvId);
        } else {
          /**
           * If the upstream FV is not realtime,
           * then we only draw a line between
           * the FV and the FS if the FV is not
           * materializing to the offline store
           *
           * This is because the only time a FS
           * is directly connected to a FV is if
           * it reads directly during a
           * getHistoricalFeature call, and that
           * only happens if the FV has
           * offlineMaterialization = false
           */
          registerDownstreamFeatureServiceOnFeatureView(fsId, fvId);
        }
      });
    }

    if (odfvWithRequestDataSource.length > 0) {
      const schemaList = odfvWithRequestDataSource.map<ODFVSchemaEntry>((odfvId) => {
        return collectSchemaFromRequestDataSources(odfvId, idToFco);
      });

      const schemaCount = schemaList.reduce((count, entry) => {
        if (entry.schema) {
          return count + entry.schema.length;
        } else {
          return count;
        }
      }, 0);

      const requestDataSourceID = 'REQ_' + fsId;

      const requestDataSourceNode: RequestNode = {
        id: requestDataSourceID,
        name: 'Request Data Source',
        type: 'request_data_source',
        schemaCount,
        downstreamNodes: odfvWithRequestDataSource,
        upstreamNodes: [fsId],
      };

      addNode(requestDataSourceNode);
    }

    const trainerNode: ModelTrainerNode = {
      id: 'Trainer_' + fsId,
      name: 'Model Trainer',
      type: 'model_trainer',
      upstreamNodes: [fsId],
      featureServiceId: fsId,
      featureServiceName: fs.name || fsId,
    };

    addEdge(
      fsId,
      trainerNode.id,
      {
        isOfflineReadPath: true,
      },
      fsId
    );

    addNode(trainerNode);

    const inferenceNode: ModelInferenceNode = {
      id: 'Inference_' + fsId,
      name: 'Model Inference',
      type: 'model_inference',
      inferenceType: fs.isOnlineServingEnabled ? 'real_time' : 'batch',
      upstreamNodes: [fsId],
      featureServiceId: fsId,
      featureServiceName: fs.name || fsId,
    };

    addEdge(
      fsId,
      inferenceNode.id,
      {
        isOnlineServingPath: fs.isOnlineServingEnabled,
        isOfflineReadPath: !fs.isOnlineServingEnabled,
      },
      fsId
    );

    addNode(inferenceNode);

    const fvSchemas = collectFeatureServiceSchemaFromFVs(
      idToFco,
      fs.allFeatureViews.map((fvHash) => hashToId[fvHash])
    );

    const schemaCount = fvSchemas.reduce((count, entry) => {
      return count + entry.features.length;
    }, 0);

    const featureServiceNode: FeatureServiceNode = {
      id: fsId,
      name: fs.name || fsId,
      description: fs.description || '',
      type: 'feature_service',
      schemaCount,
      isOnlineServingEnabled: !!fs.isOnlineServingEnabled,
      hasRequestDataSource: odfvWithRequestDataSource.length > 0,
      upstreamNodes,
      downstreamNodes: [trainerNode.id, inferenceNode.id],
    };

    if (options?.isAnchor) {
      featureServiceNode.isAnchor = true;
    }

    return addNode(featureServiceNode);
  };

  let name = workspace;
  let templateType: DataFlowTemplateType;

  /**
   * Enough set up, let's actually process some nodes.
   */
  if (!anchor) {
    /**
     * Loop through and process all non-ODFV feature views
     */
    data.featureViews
      .filter((fv) => {
        return !fv.isRealtime;
      })
      .forEach((fvFCO) => {
        const fvId = idFormatter(fvFCO);

        addFeatureView(fvId);
      });

    /**
     * Loop through and process all data sources
     */
    data.dataSources.forEach((ds) => {
      const dsId = idFormatter(ds);

      addDataSource(dsId);
    });

    /**
     * Loop through and process all ODFV feature views
     */
    data.featureViews
      .filter((fv) => {
        return fv.isRealtime;
      })
      .forEach((odfv) => {
        const odfvId = idFormatter(odfv);

        addODFV(odfvId);
      });

    data.featureServices.forEach((fs) => {
      const fsId = idFormatter(fs);

      addFeatureService(fsId);
    });

    templateType = 'feature_service';
  } else {
    /**
     * If we have an anchor, then we only process the nodes
     * that are connected to the anchor.
     */
    const anchorId = anchor.id;
    name += '/' + anchor.id;

    switch (anchor?.type) {
      case FCOType.DATA_SOURCE:
        templateType = 'data_source';
        // TODO: Handle data source anchors
        break;

      case FCOType.FEATURE_VIEW:
        if ((idToFco[anchor.id] as FeatureViewFCO).isRealtime) {
          templateType = 'ondemand_fv';
          addODFV(anchorId, { isAnchor: true });
        } else {
          templateType = 'materializing_fv';
          addFeatureView(anchorId, { isAnchor: true });
        }

        break;

      case FCOType.FEATURE_SERVICE:
        templateType = 'feature_service';
        addFeatureService(anchorId, { isAnchor: true });
        break;

      default:
        templateType = 'feature_service';
    }
  }

  /**
   * Post Processing
   */

  // Sort nodes and edges to stablize snapshot output.
  nodes.sort((a, b) => {
    const idA: string = a.type + ' : ' + a.name;
    const idB: string = b.type + ' : ' + b.name;

    return idA.localeCompare(idB);
  });

  const storeIsOnTheGraph = !!edges.find((e) => {
    return e.source === 'STORE' || e.target === 'STORE';
  });

  if (storeIsOnTheGraph) {
    // This is necessary for empty workspaces not to crash
    // Otherwise having store nodes crashes the diagram
    addNode({
      id: 'STORE',
      type: 'store_wrapper',
      name: 'store',
    });
    addNode({
      id: 'ONLINE_STORE',
      name: 'Online Store',
      type: 'online_store',
      storeType: 'Redis',
    });
    addNode({
      id: 'OFFLINE_STORE',
      name: 'Offline Store',
      type: 'offline_store',
      storeType: 'S3',
    });
    addNode({
      id: 'store-input',
      type: 'store_input',
      name: 'Store Input',
    });
    addNode({
      id: 'store-output',
      type: 'store_output',
      name: 'Store Output',
    });
  }

  edges.sort((a, b) => {
    const idA = edgeIdFn(a);
    const idB = edgeIdFn(b);

    return idA.localeCompare(idB, 'en');
  });

  return {
    name,
    nodesList: nodes,
    edgesList: edges,
    templateType,
  };
};

export {
  buildHashToIdMap,
  buildMapWithNameAsKey,
  collectSchemaFromRequestDataSources,
  dataflowSpecSelector,
  getDependentFeatureViewIds,
  getRequestDataSourceFromODFV,
  idFormatter,
};
