import { cloneDeep, differenceWith, intersectionWith, isEqual, map, omit, set } from 'lodash';

import { History } from '@tigergraph/tools-models/history';
import {
  AlterVertexOrEdge,
  AlterVertexAttributeIndex,
  Edge,
  Graph,
  GSQLAttributeJson,
  GSQLEdgeJson,
  GSQLGraphJson,
  GSQLVertexJson,
  SchemaChange,
  SchemaChangeType,
} from '@tigergraph/tools-models/topology';

export class SchemaChangeLogicService {
  constructor() {}

  /**
   * Generate schema change jobs based on schema change history.
   * 1. Generate the raw schema change object.
   * 2. Generate at most 4 schema change jobs, each job in charge of the following section. If the
   *    corresponding section is empty, that job is ommited.
   *    job1: drop local edge and vertex types
   *    job2: drop global edge and vertex types
   *    job3: add global vertex and edge types
   *    job4: add local vertex and edge types
   * 3. Piggyback the alter local vertex and edge types sections to the 1st local schema change job
   *    of the above list. If no local schema change job in the list, append one.
   * 4. Piggyback the alter global vertex and edge types sections to the 1st global schema change
   *    job of the above list. If no global schema change job in the list, append one.
   * 5. Generate at most 2 schema change jobs, each job in charge of the following section. If the
   *    corresponding section is empty, that job is ommited.
   *    job5: add attribute index for global vertex types
   *    job6: add attribute index for local vertex types
   *
   * @param {History<Graph>} schemaChangeHistory
   * @returns {SchemaChange[]}
   * @memberof SchemaChangeLogicService
   */
  getSchemaChangeJobs(schemaChangeHistory: History<Graph>, isGlobalView: boolean): SchemaChange[] {
    // If no schema in history, then no schema change jobs needed.
    if (schemaChangeHistory.getHistoryLength() < 1) {
      return [];
    }

    // Get the original and final schema (for judge if a vertex or edge type is local in original schema).
    const originalSchema = schemaChangeHistory.getRecord(0);
    const finalSchema = schemaChangeHistory.getRecord(schemaChangeHistory.getHistoryLength() - 1);
    // Get the overall schema change.
    const rawSchemaChange = this.getSchemaChange(schemaChangeHistory);

    // Get the global and local part of each section in overall schema change.
    // Alter
    const alterGlobalEdgeTypes = rawSchemaChange.alterEdgeTypes.filter(
      (alterEdge) => !originalSchema.getEdge(alterEdge.name).dumpToGSQLJson().IsLocal
    );
    const alterLocalEdgeTypes = rawSchemaChange.alterEdgeTypes.filter(
      (alterEdge) => originalSchema.getEdge(alterEdge.name).dumpToGSQLJson().IsLocal
    );
    // Split add attribute index jobs and all other alter jobs.
    let alterGlobalVertexTypes = rawSchemaChange.alterVertexTypes.filter(
      (alterVertex) => !finalSchema.getVertex(alterVertex.name).dumpToGSQLJson().IsLocal
    );
    const addAttributeIndexGlobalVertexTypes = this.getAddAttributeIndexJobs(alterGlobalVertexTypes);
    alterGlobalVertexTypes = this.getAlterJobsExceptAddAttributeIndex(alterGlobalVertexTypes);

    let alterLocalVertexTypes = rawSchemaChange.alterVertexTypes.filter(
      (alterVertex) => finalSchema.getVertex(alterVertex.name).dumpToGSQLJson().IsLocal
    );
    const addAttributeIndexLocalVertexTypes = this.getAddAttributeIndexJobs(alterLocalVertexTypes);
    alterLocalVertexTypes = this.getAlterJobsExceptAddAttributeIndex(alterLocalVertexTypes);

    // Drop
    const dropGlobalEdgeTypes = rawSchemaChange.dropEdgeTypes.filter(
      (dropEdge) => !originalSchema.getEdge(dropEdge).dumpToGSQLJson().IsLocal
    );
    const dropLocalEdgeTypes = rawSchemaChange.dropEdgeTypes.filter(
      (dropEdge) => originalSchema.getEdge(dropEdge).dumpToGSQLJson().IsLocal
    );
    const dropGlobalVertexTypes = rawSchemaChange.dropVertexTypes.filter(
      (dropVertex) => !originalSchema.getVertex(dropVertex).dumpToGSQLJson().IsLocal
    );
    const dropLocalVertexTypes = rawSchemaChange.dropVertexTypes.filter(
      (dropVertex) => originalSchema.getVertex(dropVertex).dumpToGSQLJson().IsLocal
    );

    // Add
    const addGlobalVertexTypes = rawSchemaChange.addVertexTypes.filter((addVertex) => !addVertex.IsLocal);
    const addLocalVertexTypes = rawSchemaChange.addVertexTypes.filter((addVertex) => addVertex.IsLocal);
    const addGlobalEdgeTypes = rawSchemaChange.addEdgeTypes.filter((addEdge) => !addEdge.IsLocal);
    const addLocalEdgeTypes = rawSchemaChange.addEdgeTypes.filter((addEdge) => addEdge.IsLocal);

    const result: SchemaChange[] = [];
    // job1: drop local edge and vertex types
    if (dropLocalEdgeTypes.length > 0 || dropLocalVertexTypes.length > 0) {
      result.push({
        alterEdgeTypes: [],
        alterVertexTypes: [],
        dropEdgeTypes: dropLocalEdgeTypes,
        dropVertexTypes: dropLocalVertexTypes,
        addVertexTypes: [],
        addEdgeTypes: [],
      });
    }

    if (!isGlobalView) {
      // Add graph related global schema change
      if (
        dropGlobalEdgeTypes.length > 0 ||
        dropGlobalVertexTypes.length > 0 ||
        addGlobalVertexTypes.length > 0 ||
        addGlobalEdgeTypes.length > 0
      ) {
        const addVertexTypes = addGlobalVertexTypes.map((vt) => vt.Name);
        const addEdgeTypes = addGlobalEdgeTypes.map((et) => et.Name);
        const ignoreTypes: string[] = intersectionWith(
          addVertexTypes.concat(addEdgeTypes),
          dropGlobalVertexTypes.concat(dropGlobalEdgeTypes),
          isEqual
        );
        // job2: drop global edge and vertex types (for local graphs).
        const updatedDropEdgeList: string[] = differenceWith(dropGlobalEdgeTypes, ignoreTypes, isEqual);
        const updatedDropVertexList: string[] = differenceWith(dropGlobalVertexTypes, ignoreTypes, isEqual);

        // job3: add global vertex and edge types (for local graphs).
        const updatedAddVertexList: string[] = differenceWith(addVertexTypes, ignoreTypes, isEqual);
        const updatedAddEdgeList: string[] = differenceWith(addEdgeTypes, ignoreTypes, isEqual);

        // Last step, add reverse edges to add/drop edge list
        // (GSQL needs reversed edge name for schema change jobs).
        cloneDeep(updatedDropEdgeList).forEach((edge) => {
          const edgeType = originalSchema.getEdge(edge);
          if (edgeType.hasReverseEdge) {
            updatedDropEdgeList.push(edgeType.reverseEdge);
          }
        });
        cloneDeep(updatedAddEdgeList).forEach((edge) => {
          const edgeType = finalSchema.getEdge(edge);
          if (edgeType.hasReverseEdge) {
            updatedAddEdgeList.push(edgeType.reverseEdge);
          }
        });

        if (
          updatedDropEdgeList.length > 0 ||
          updatedDropVertexList.length > 0 ||
          updatedAddVertexList.length > 0 ||
          updatedAddEdgeList.length > 0
        ) {
          result.push({
            alterEdgeTypes: [],
            alterVertexTypes: [],
            dropEdgeTypes: [],
            dropVertexTypes: [],
            addVertexTypes: [],
            addEdgeTypes: [],
            graphs: [
              {
                graphName: finalSchema.name,
                dropEdgeTypes: updatedDropEdgeList,
                dropVertexTypes: updatedDropVertexList,
                addVertexTypes: updatedAddVertexList,
                addEdgeTypes: updatedAddEdgeList,
              },
            ],
          });
        }
      }
    } else {
      // job2: drop global edge and vertex types (for global).
      if (dropGlobalEdgeTypes.length > 0 || dropGlobalVertexTypes.length > 0) {
        result.push({
          alterEdgeTypes: [],
          alterVertexTypes: [],
          dropEdgeTypes: dropGlobalEdgeTypes,
          dropVertexTypes: dropGlobalVertexTypes,
          addVertexTypes: [],
          addEdgeTypes: [],
        });
      }

      // job3: add global vertex and edge types (for global).
      if (addGlobalEdgeTypes.length > 0 || addGlobalVertexTypes.length > 0) {
        result.push({
          alterEdgeTypes: [],
          alterVertexTypes: [],
          dropEdgeTypes: [],
          dropVertexTypes: [],
          addVertexTypes: addGlobalVertexTypes,
          addEdgeTypes: addGlobalEdgeTypes,
        });
      }
    }

    // job4: add local vertex and edge types
    if (addLocalEdgeTypes.length > 0 || addLocalVertexTypes.length > 0) {
      result.push({
        alterEdgeTypes: [],
        alterVertexTypes: [],
        dropEdgeTypes: [],
        dropVertexTypes: [],
        addVertexTypes: addLocalVertexTypes,
        addEdgeTypes: addLocalEdgeTypes,
      });
    }
    // Piggyback the local alter vertex and edge type sections, or create one dedicated
    // schema change object if nowhere to piggyback.
    if (alterLocalVertexTypes.length > 0 || alterLocalEdgeTypes.length > 0) {
      let piggybackLocalAlter = false;
      for (let i = 0; i < result.length; i++) {
        if (!result[i].graphs) {
          result[i].alterVertexTypes = alterLocalVertexTypes;
          result[i].alterEdgeTypes = alterLocalEdgeTypes;
          piggybackLocalAlter = true;
          break;
        }
      }
      if (!piggybackLocalAlter) {
        result.push({
          alterEdgeTypes: alterLocalEdgeTypes,
          alterVertexTypes: alterLocalVertexTypes,
          dropEdgeTypes: [],
          dropVertexTypes: [],
          addVertexTypes: [],
          addEdgeTypes: [],
        });
      }
    }
    // Piggyback the global alter vertex and edge type sections, or create one dedicated
    // schema change object if nowhere to piggyback.
    if (alterGlobalVertexTypes.length > 0 || alterGlobalEdgeTypes.length > 0) {
      if (result.length > 0) {
        result[0].alterVertexTypes = alterGlobalVertexTypes;
        result[0].alterEdgeTypes = alterGlobalEdgeTypes;
      } else {
        result.push({
          alterEdgeTypes: alterGlobalEdgeTypes,
          alterVertexTypes: alterGlobalVertexTypes,
          dropEdgeTypes: [],
          dropVertexTypes: [],
          addVertexTypes: [],
          addEdgeTypes: [],
        });
      }
    }

    // If one schema change job contains altering vertex or edge types that drop and add an attribute
    // with same name, then split the job into 2 jobs, the 2nd one contains the adding attributes
    // part and the 1st one contains everything else (GSQL cannot drop and add attributes with same name)
    const tmpResult = result.splice(0, result.length);
    tmpResult.forEach((schemaChange) => {
      let needSplit = false;
      const alterTypes = schemaChange.alterVertexTypes.concat(schemaChange.alterEdgeTypes);
      alterTypes.forEach((alterType) => {
        if (
          intersectionWith(
            alterType.dropAttributes,
            alterType.addAttributes.map((attr) => attr.AttributeName),
            isEqual
          ).length > 0
        ) {
          needSplit = true;
        }
      });
      if (needSplit) {
        // The 1st splitted job: containing everything except added attributes.
        result.push({
          alterEdgeTypes: schemaChange.alterEdgeTypes
            .filter((alterEdgeType) => alterEdgeType.dropAttributes.length > 0)
            .map((alterEdgeType) => {
              return {
                name: alterEdgeType.name,
                dropAttributes: alterEdgeType.dropAttributes,
                addAttributes: [],
                addIndexAttributes: [],
                dropIndexAttributes: [],
              };
            }),
          alterVertexTypes: schemaChange.alterVertexTypes
            .filter(
              (alterVertexType) =>
                alterVertexType.dropAttributes.length > 0 || alterVertexType.dropIndexAttributes.length > 0
            )
            .map((alterVertexType) => {
              return {
                name: alterVertexType.name,
                dropAttributes: alterVertexType.dropAttributes,
                addAttributes: [],
                addIndexAttributes: [],
                dropIndexAttributes: alterVertexType.dropIndexAttributes,
              };
            }),
          dropEdgeTypes: schemaChange.dropEdgeTypes,
          dropVertexTypes: schemaChange.dropVertexTypes,
          addVertexTypes: schemaChange.addVertexTypes,
          addEdgeTypes: schemaChange.addEdgeTypes,
        });
        // The 2nd splitted job: containing only added attributes.
        result.push({
          alterEdgeTypes: schemaChange.alterEdgeTypes
            .filter((alterEdgeType) => alterEdgeType.addAttributes.length > 0)
            .map((alterEdgeType) => {
              return {
                name: alterEdgeType.name,
                dropAttributes: [],
                addAttributes: alterEdgeType.addAttributes,
                addIndexAttributes: [],
                dropIndexAttributes: [],
              };
            }),
          alterVertexTypes: schemaChange.alterVertexTypes
            .filter((alterVertexType) => alterVertexType.addAttributes.length > 0)
            .map((alterVertexType) => {
              return {
                name: alterVertexType.name,
                dropAttributes: [],
                addAttributes: alterVertexType.addAttributes,
                addIndexAttributes: [],
                dropIndexAttributes: [],
              };
            }),
          dropEdgeTypes: [],
          dropVertexTypes: [],
          addVertexTypes: [],
          addEdgeTypes: [],
        });
      } else {
        result.push(schemaChange);
      }
    });

    // job5: add attribute index for global vertex types
    if (isGlobalView && addAttributeIndexGlobalVertexTypes.length > 0) {
      result.push({
        alterEdgeTypes: [],
        alterVertexTypes: addAttributeIndexGlobalVertexTypes,
        dropEdgeTypes: [],
        dropVertexTypes: [],
        addVertexTypes: [],
        addEdgeTypes: [],
      });
    }

    // job6: add attribute index for local vertex types
    if (addAttributeIndexLocalVertexTypes.length > 0) {
      result.push({
        alterEdgeTypes: [],
        alterVertexTypes: addAttributeIndexLocalVertexTypes,
        dropEdgeTypes: [],
        dropVertexTypes: [],
        addVertexTypes: [],
        addEdgeTypes: [],
      });
    }

    // Last step, add reverse edges to drop edge list
    // (GSQL not treating reverse edge as part of edge for schema change jobs).
    if (!isGlobalView) {
      result.forEach((schemaChange) => {
        const dropEdgeTypes: string[] = [];
        schemaChange.dropEdgeTypes.forEach((eType) => {
          dropEdgeTypes.push(eType);
          const edgeTypeJson = originalSchema.getEdge(eType).dumpToGSQLJson();
          if (edgeTypeJson.Config && edgeTypeJson.Config['REVERSE_EDGE']) {
            dropEdgeTypes.push(edgeTypeJson.Config['REVERSE_EDGE']);
          }
        });
        schemaChange.dropEdgeTypes = dropEdgeTypes;
      });
    }

    return result;
  }

  /**
   * Generate the schema change object based on schema change history.
   *
   * ASSUMPTIONS:
   * 1. In each schema change step, only one vertex or edge can be modified.
   *    (the most important rule cannot be broken)
   * 2. In each schema change step, one or more vertex or edge types can be created.
   *    New vertex and edge types are pushed to the end of the vertex type list and edge type list.
   * 3. In each schema change step, one or more vertex or edge types can be removed.
   *
   * @param {History<Graph>} schemaChangeHistory
   * @returns {SchemaChange}
   * @memberof SchemaChangeLogicService
   */
  getSchemaChange(schemaChangeHistory: History<Graph>): SchemaChange {
    // Get an empty schema change object.
    const schemaChange: SchemaChange = {
      schemaChangeType: SchemaChangeType.Raw,
      alterEdgeTypes: [],
      alterVertexTypes: [],
      dropEdgeTypes: [],
      dropVertexTypes: [],
      addVertexTypes: [],
      addEdgeTypes: [],
    };

    // The schema change history cannot be empty.
    if (schemaChangeHistory.getHistoryLength() < 1) {
      return schemaChange;
    }

    // Cloned copy of the original schema and final schema.
    const originalSchema = schemaChangeHistory.getRecord(0).dumpToGSQLJson();
    const finalSchema = schemaChangeHistory.getRecord(schemaChangeHistory.getHistoryLength() - 1).dumpToGSQLJson();

    // Records the vertex types of current schema map to the vertex types in original schema.
    // -1 means it is a new vertex type.
    const mappingToVertexTypesInOriginalSchema = originalSchema.VertexTypes.map((__, i) => i);
    // Records the edge types of current schema map to the edge types in original schema.
    // -1 means it is a new edge type.
    const mappingToEdgeTypesInOriginalSchema = originalSchema.EdgeTypes.map((__, i) => i);

    // Replay the schema change history and generate the schema change step by step.
    for (let i = 1; i < schemaChangeHistory.getHistoryLength(); i++) {
      const prevSchema = schemaChangeHistory.getRecord(i - 1).dumpToGSQLJson();
      const nextSchema = schemaChangeHistory.getRecord(i).dumpToGSQLJson();

      // Skip the step if the schema is not changed (only change schema style).
      if (isEqual(prevSchema, nextSchema)) {
        continue;
      }

      // In the step is adding new vertex and edge types:
      if (this.schemaChangeNotTouchExistingVertexAndEdge(prevSchema, nextSchema)) {
        // Should map the new vertex and edge types to index -1.
        if (nextSchema.VertexTypes.length > prevSchema.VertexTypes.length) {
          for (let j = 0; j < nextSchema.VertexTypes.length - prevSchema.VertexTypes.length; j++) {
            mappingToVertexTypesInOriginalSchema.push(-1);
          }
        }
        if (nextSchema.EdgeTypes.length > prevSchema.EdgeTypes.length) {
          for (let j = 0; j < nextSchema.EdgeTypes.length - prevSchema.EdgeTypes.length; j++) {
            mappingToEdgeTypesInOriginalSchema.push(-1);
          }
        }
        continue;
      }

      // If the step is dropping some vertex and edge types:
      if (this.afterSchemaChangeAllRemainingVertexAndEdgeStaySame(prevSchema, nextSchema)) {
        // Get removed vertex type names and edge type names.
        const [removedVertexTypeNames, removedEdgeTypeNames] = this.getRemovedVertexAndEdgeTypes(
          prevSchema,
          nextSchema
        );
        // Remove the mapping if the vertex or edge type is removed.
        for (let j = prevSchema.VertexTypes.length - 1; j >= 0; j--) {
          if (removedVertexTypeNames.includes(prevSchema.VertexTypes[j].Name)) {
            mappingToVertexTypesInOriginalSchema.splice(j, 1);
          }
        }
        for (let j = prevSchema.EdgeTypes.length - 1; j >= 0; j--) {
          if (removedEdgeTypeNames.includes(prevSchema.EdgeTypes[j].Name)) {
            mappingToEdgeTypesInOriginalSchema.splice(j, 1);
          }
        }
        continue;
      }

      // If the step is altering a vertex or edge type, don't need to change the mapping.
    }

    // For the edge types to be altered, if the altering cannot be performed by schema change,
    // then drop the edge and add a new one by setting the mapping to be -1.
    mappingToEdgeTypesInOriginalSchema.forEach((mapping, i) => {
      if (mapping === -1) {
        return;
      }
      if (!this.canPerformSchemaChangeForEdgeType(originalSchema.EdgeTypes[mapping], finalSchema.EdgeTypes[i])) {
        mappingToEdgeTypesInOriginalSchema[i] = -1;
      }
    });

    // For the vertex types to be altered, if the altering cannot be performed by schema change,
    // then drop the vertex type and all edge types source or target by the vertex type by setting
    // their mapping to -1, so that we will add new vertex/edge types in place of the ones we removed.
    mappingToVertexTypesInOriginalSchema.forEach((mapping, i) => {
      if (mapping === -1) {
        return;
      }
      const vertexTypeInOriginSchema = this.excludeAttrIndex(originalSchema.VertexTypes[mapping], true);
      const vertexTypeInFinalSchema = this.excludeAttrIndex(finalSchema.VertexTypes[i], true);
      if (!this.canPerformSchemaChangeForVertexType(vertexTypeInOriginSchema, vertexTypeInFinalSchema)) {
        mappingToVertexTypesInOriginalSchema[i] = -1;
        finalSchema.EdgeTypes.forEach((edgeType, j) => {
          if (
            edgeType.FromVertexTypeName === vertexTypeInFinalSchema.Name ||
            edgeType.ToVertexTypeName === vertexTypeInFinalSchema.Name
          ) {
            mappingToEdgeTypesInOriginalSchema[j] = -1;
          }
          if (edgeType.EdgePairs) {
            const pairsInvolvingVertexTypesToBeRecreated = edgeType.EdgePairs.filter(
              (pair) => pair.From === vertexTypeInFinalSchema.Name || pair.To === vertexTypeInFinalSchema.Name
            );
            if (pairsInvolvingVertexTypesToBeRecreated.length > 0) {
              mappingToEdgeTypesInOriginalSchema[j] = -1;
            }
          }
        });
      }
    });

    // Generate the schema change object based on the mapping:
    // 1. All vertex and edge types with a mapping -1 in final schema are newly created vertex and edge types;
    // 2. All vertex and edge types not mapped in original schema are dropped vertex and edge types;
    // 3. All normal types can generate an AlterVertexOrEdge object.
    mappingToVertexTypesInOriginalSchema.forEach((mapping, i) => {
      if (mapping === -1) {
        const vertexType = finalSchema.VertexTypes[i];

        // For newly added or added back vertex types, if having attribute index, generate name for it.
        vertexType.Attributes.forEach((attribute) => {
          if (attribute.HasIndex) {
            attribute.IndexName = this.generateAttributeIndexName(attribute.AttributeName);
          }
        });

        schemaChange.addVertexTypes.push(finalSchema.VertexTypes[i]);
      } else {
        const vertexTypeInOriginSchema = this.excludeAttrIndex(originalSchema.VertexTypes[mapping]);
        const vertexTypeInFinalSchema = this.excludeAttrIndex(finalSchema.VertexTypes[i]);
        if (!isEqual(vertexTypeInOriginSchema, vertexTypeInFinalSchema)) {
          schemaChange.alterVertexTypes.push(
            this.getAlterVertex(originalSchema.VertexTypes[mapping], finalSchema.VertexTypes[i])
          );
        }
      }
    });
    mappingToEdgeTypesInOriginalSchema.forEach((mapping, i) => {
      if (mapping === -1) {
        schemaChange.addEdgeTypes.push(finalSchema.EdgeTypes[i]);
      } else {
        if (!isEqual(originalSchema.EdgeTypes[mapping], finalSchema.EdgeTypes[i])) {
          schemaChange.alterEdgeTypes.push(
            this.getAlterEdge(originalSchema.EdgeTypes[mapping], finalSchema.EdgeTypes[i])
          );
        }
      }
    });
    originalSchema.VertexTypes.forEach((vertexType, i) => {
      if (mappingToVertexTypesInOriginalSchema.indexOf(i) === -1) {
        schemaChange.dropVertexTypes.push(vertexType.Name);
      }
    });
    originalSchema.EdgeTypes.forEach((edgeType, i) => {
      if (mappingToEdgeTypesInOriginalSchema.indexOf(i) === -1) {
        schemaChange.dropEdgeTypes.push(edgeType.Name);
      }
    });

    return schemaChange;
  }

  /**
   * Return new vertex object without HasIndex and IndexName properties for schema comparison.
   *
   * @param {GSQLVertexJson} vertex
   * @param {boolean} [excludeIndexFlag]
   * @returns {GSQLVertexJson}
   * @memberof SchemaChangeLogicService
   */
  excludeAttrIndex(vertex: GSQLVertexJson, excludeIndexFlag?: boolean): GSQLVertexJson {
    const vertexCopy = cloneDeep(vertex);
    vertexCopy.Attributes = excludeIndexFlag
      ? map(vertex.Attributes, (attribute) => omit(attribute, ['HasIndex', 'IndexName']))
      : map(vertex.Attributes, (attribute) => omit(attribute, 'IndexName'));
    return vertexCopy;
  }

  /**
   * Check if the one-step schema change doesn't affect existing vertex types and edge types.
   * This happens when user is adding new vertex or edge types.
   *
   * @private
   * @param {GSQLGraphJson} prevSchema
   * @param {GSQLGraphJson} nextSchema
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private schemaChangeNotTouchExistingVertexAndEdge(prevSchema: GSQLGraphJson, nextSchema: GSQLGraphJson): boolean {
    // Check if all vertex types in prevSchema is the same in nextSchema.
    if (
      differenceWith(
        map(prevSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
        map(nextSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
        isEqual
      ).length > 0
    ) {
      return false;
    }
    // Check if all edge types in prevSchema is the same in nextSchema.
    if (differenceWith(prevSchema.EdgeTypes, nextSchema.EdgeTypes, isEqual).length > 0) {
      return false;
    }
    return true;
  }

  /**
   * Check if after one-step schema change all vertex and edge types remaining in schema keeps same
   * as before the schema change (except changing schema style).
   * This happens when user is deleting some vertex or edge types.
   *
   * @private
   * @param {GSQLGraphJson} prevSchema
   * @param {GSQLGraphJson} nextSchema
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private afterSchemaChangeAllRemainingVertexAndEdgeStaySame(
    prevSchema: GSQLGraphJson,
    nextSchema: GSQLGraphJson
  ): boolean {
    // Check if all vertex types and edge types in nextSchema remained same from prevSchema.
    if (
      differenceWith(
        map(nextSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
        map(prevSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
        isEqual
      ).length > 0
    ) {
      return false;
    }
    if (differenceWith(nextSchema.EdgeTypes, prevSchema.EdgeTypes, this.edgePairShrinkNextToPrev).length > 0) {
      return false;
    }
    return true;
  }

  /**
   * Compare prevEdge with nextEdge, check if the only difference is that prevEdge
   * has same or less edge pairs than next.
   *
   * @private
   * @param {GSQLEdgeJson} nextEdge
   * @param {GSQLEdgeJson} prevEdge
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private edgePairShrinkNextToPrev(nextEdge: GSQLEdgeJson, prevEdge: GSQLEdgeJson): boolean {
    const tmpNextEdge = new Edge().loadFromGSQLJson(nextEdge);
    const tmpPrevEdge = new Edge().loadFromGSQLJson(prevEdge);

    if (differenceWith(tmpNextEdge.fromToVertexTypePairs, tmpPrevEdge.fromToVertexTypePairs, isEqual).length > 0) {
      return false;
    }

    tmpNextEdge.fromToVertexTypePairs = tmpPrevEdge.fromToVertexTypePairs = [];
    return isEqual(tmpNextEdge, tmpPrevEdge);
  }

  /**
   * Compare prevEdge with nextEdge, check if the only difference is that nextEdge
   * has same or less edge pairs than prevEdge.
   *
   * @private
   * @param {GSQLEdgeJson} prevEdge
   * @param {GSQLEdgeJson} nextEdge
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private edgePairShrinkPrevToNext(prevEdge: GSQLEdgeJson, nextEdge: GSQLEdgeJson): boolean {
    const tmpNextEdge = new Edge().loadFromGSQLJson(nextEdge);
    const tmpPrevEdge = new Edge().loadFromGSQLJson(prevEdge);

    if (differenceWith(tmpNextEdge.fromToVertexTypePairs, tmpPrevEdge.fromToVertexTypePairs, isEqual).length > 0) {
      return false;
    }

    tmpNextEdge.fromToVertexTypePairs = tmpPrevEdge.fromToVertexTypePairs = [];
    return isEqual(tmpNextEdge, tmpPrevEdge);
  }

  /**
   * Get all removed vertex type names and edge type names.
   *
   * @private
   * @param {GSQLGraphJson} prevSchema
   * @param {GSQLGraphJson} nextSchema
   * @returns {[string[], string[]]}
   * @memberof LoadingJobMigrationLogicService
   */
  private getRemovedVertexAndEdgeTypes(prevSchema: GSQLGraphJson, nextSchema: GSQLGraphJson): [string[], string[]] {
    const removedVertexTypeNames = differenceWith<GSQLVertexJson, GSQLVertexJson>(
      map(prevSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
      map(nextSchema.VertexTypes, (vertexType) => (vertexType = this.excludeAttrIndex(vertexType, true))),
      isEqual
    ).map((vertexType) => vertexType.Name);
    const removedEdgeTypeNames = differenceWith<GSQLEdgeJson, GSQLEdgeJson>(
      prevSchema.EdgeTypes,
      nextSchema.EdgeTypes,
      this.edgePairShrinkPrevToNext
    ).map((edgeType) => edgeType.Name);
    return [removedVertexTypeNames, removedEdgeTypeNames];
  }

  /**
   * Check if the change of an edge type can be performed by schema change.
   *
   * @private
   * @param {GSQLEdgeJson} edgeInPrevSchema
   * @param {GSQLEdgeJson} edgeInNextSchema
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private canPerformSchemaChangeForEdgeType(edgeInPrevSchema: GSQLEdgeJson, edgeInNextSchema: GSQLEdgeJson): boolean {
    if (isEqual(edgeInPrevSchema, edgeInNextSchema)) {
      return true;
    }

    if (this.isEdgeDiscriminatorChanged(edgeInPrevSchema, edgeInNextSchema)) {
      return false;
    }

    return this.canPerformSchemaChangeForVertexOrEdgeType(edgeInPrevSchema, edgeInNextSchema);
  }

  /**
   * Find out if edge discriminators were changed.
   *
   * @private
   * @param {GSQLEdgeJson} edgeInPrevSchema
   * @param {GSQLEdgeJson} edgeInNextSchema
   * @return {boolean}
   * @memberof SchemaChangeLogicService
   */
  private isEdgeDiscriminatorChanged(edgeInPrevSchema: GSQLEdgeJson, edgeInNextSchema: GSQLEdgeJson): boolean {
    if (edgeInPrevSchema.DiscriminatorCount !== edgeInNextSchema.DiscriminatorCount) {
      return true;
    }

    for (let i = 0; i < (edgeInPrevSchema.DiscriminatorCount ?? 0); i++) {
      if (!isEqual(edgeInPrevSchema.Attributes[i], edgeInNextSchema.Attributes[i])) {
        return true;
      }
    }

    return false;
  }

  /**
   * Check if the change of a vertex type can be performed by schema change.
   *
   * @private
   * @param {GSQLVertexJson} vertexInPrevSchema
   * @param {GSQLVertexJson} vertexInNextSchema
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private canPerformSchemaChangeForVertexType(
    vertexInPrevSchema: GSQLVertexJson,
    vertexInNextSchema: GSQLVertexJson
  ): boolean {
    if (isEqual(vertexInPrevSchema, vertexInNextSchema)) {
      return true;
    }

    return this.canPerformSchemaChangeForVertexOrEdgeType(vertexInPrevSchema, vertexInNextSchema);
  }

  /**
   * Check if the change of a vertex or edge type can be performed by schema change.
   * We extract this function out for future extension if vertex and edge have different way to
   * handle schema change.
   *
   * @private
   * @param {(GSQLVertexJson | GSQLEdgeJson)} veInPrevSchema
   * @param {(GSQLVertexJson | GSQLEdgeJson)} veInNextSchema
   * @returns {boolean}
   * @memberof SchemaChangeLogicService
   */
  private canPerformSchemaChangeForVertexOrEdgeType(
    veInPrevSchema: GSQLVertexJson | GSQLEdgeJson,
    veInNextSchema: GSQLVertexJson | GSQLEdgeJson
  ): boolean {
    // If the vertex or edge change involves changes other than attributes, it is not supported by GSQL schema change.
    const veInPrevSchemaCopy = cloneDeep(veInPrevSchema);
    veInPrevSchemaCopy.Attributes = [];
    const veInNextSchemaCopy = cloneDeep(veInNextSchema);
    veInNextSchemaCopy.Attributes = [];
    return isEqual(veInPrevSchemaCopy, veInNextSchemaCopy);
  }

  /**
   * Generate the schema change item for one vertex type.
   * Here we assume the changes can be performed by GSQL schema change.
   *
   * @private
   * @param {GSQLVertexJson} vertexInPrevSchema
   * @param {GSQLVertexJson} vertexInNextSchema
   * @returns {AlterVertexOrEdge}
   * @memberof SchemaChangeLogicService
   */
  private getAlterVertex(vertexInPrevSchema: GSQLVertexJson, vertexInNextSchema: GSQLVertexJson): AlterVertexOrEdge {
    return this.getAlterVertexOrEdge(vertexInPrevSchema, vertexInNextSchema);
  }

  /**
   * Generate the schema change item for one edge type.
   * Here we assume the changes can be performed by GSQL schema change.
   *
   * @private
   * @param {GSQLEdgeJson} edgeInPrevSchema
   * @param {GSQLEdgeJson} edgeInNextSchema
   * @returns {AlterVertexOrEdge}
   * @memberof SchemaChangeLogicService
   */
  private getAlterEdge(edgeInPrevSchema: GSQLEdgeJson, edgeInNextSchema: GSQLEdgeJson): AlterVertexOrEdge {
    return this.getAlterVertexOrEdge(edgeInPrevSchema, edgeInNextSchema);
  }

  /**
   * Generate the schema change item for one vertex type or edge type.
   * We extract this function out for future extension if vertex and edge have different way to
   * handle schema change.
   *
   * @private
   * @param {(GSQLVertexJson | GSQLEdgeJson)} veInPrevSchema
   * @param {(GSQLVertexJson | GSQLEdgeJson)} veInNextSchema
   * @returns {AlterVertexOrEdge}
   * @memberof SchemaChangeLogicService
   */
  private getAlterVertexOrEdge(
    veInPrevSchema: GSQLVertexJson | GSQLEdgeJson,
    veInNextSchema: GSQLVertexJson | GSQLEdgeJson
  ): AlterVertexOrEdge {
    const discriminatorCount =
      'DiscriminatorCount' in veInPrevSchema ? (<GSQLEdgeJson>veInPrevSchema).DiscriminatorCount ?? 0 : 0;

    // Get the mapping indices from attributes in next schema towards attributes in prev schema.
    const attributesMapping = this.getAttributesMapping(
      veInPrevSchema.Attributes.slice(discriminatorCount),
      veInNextSchema.Attributes.slice(discriminatorCount)
    );
    const attributesToBeDropped: string[] = [];
    const attributesToBeAdded: GSQLAttributeJson[] = [];
    const attributeIndicesToBeDropped: AlterVertexAttributeIndex[] = [];
    const attributeIndicesToBeAdded: AlterVertexAttributeIndex[] = [];

    // Add new attributes.
    attributesMapping.forEach((mapping, i) => {
      const prevAttribute = veInPrevSchema.Attributes[discriminatorCount + mapping];
      const nextAttribute = veInNextSchema.Attributes[discriminatorCount + i];

      if (mapping === -1) {
        attributesToBeAdded.push(nextAttribute);
        if (nextAttribute.HasIndex) {
          attributeIndicesToBeAdded.push({
            attributeName: nextAttribute.AttributeName,
            indexName: this.generateAttributeIndexName(nextAttribute.AttributeName),
          });
        }
        return;
      } else {
        // Add/drop attribute index for existing attributes.
        if (prevAttribute.HasIndex && !nextAttribute.HasIndex) {
          attributeIndicesToBeDropped.push({
            attributeName: prevAttribute.AttributeName,
            indexName: prevAttribute.IndexName ?? '',
          });
        }
        if (!prevAttribute.HasIndex && nextAttribute.HasIndex) {
          attributeIndicesToBeAdded.push({
            attributeName: nextAttribute.AttributeName,
            indexName: this.generateAttributeIndexName(nextAttribute.AttributeName),
          });
        }
      }
    });

    // Drop attributes.
    for (let i = discriminatorCount; i < veInPrevSchema.Attributes.length; i++) {
      const attribute = veInPrevSchema.Attributes[i];
      if (attributesMapping.indexOf(i - discriminatorCount) === -1) {
        attributesToBeDropped.push(attribute.AttributeName);
      }
    }

    return {
      name: veInPrevSchema.Name,
      dropAttributes: attributesToBeDropped,
      addAttributes: attributesToBeAdded,
      dropIndexAttributes: attributeIndicesToBeDropped,
      addIndexAttributes: attributeIndicesToBeAdded,
    };
  }

  /**
   * Get the mapping indices from attributes in next schema to attributes in prev schema.
   *
   * @private
   * @param {GSQLAttributeJson[]} prevAttributes
   * @param {GSQLAttributeJson[]} nextAttributes
   * @returns {number[]}
   * @memberof SchemaChangeLogicService
   */
  private getAttributesMapping(prevAttributes: GSQLAttributeJson[], nextAttributes: GSQLAttributeJson[]): number[] {
    let mappings: number[];
    mappings = nextAttributes.map((nextAttribute) => {
      let index = -1;
      prevAttributes.forEach((prevAttribute, i) => {
        // Changing attribute index doesn't need to drop the attribute and add it back.
        const remove = ['HasIndex', 'IndexName'];
        if (isEqual(omit(prevAttribute, remove), omit(nextAttribute, remove))) {
          index = i;
        }
      });
      return index;
    });
    // The mapping indices must be separated into two (optional) sections, in the 1st section
    // all mappings need to be increasing, and in the second section all mappings must be -1.
    // E.g., the mapping [0, 2, 3, -1, -1, -1] is valid mapping, corresponding to drop attributes 1
    // from prev vertex or edge and add 3 more attributes.
    // We treat all attribute related (excluding attribute index) modification
    // as drop the attribute then add it back.
    // So after getting the mapping as [0, -1, 2, -1, -1], we find the first increasing part,
    // then assign -1 to all remaining ones. So the final mapping would be [0, -1, -1, -1, -1].
    // Similarly, [0, 2, 1, -1, -1] would finally be [0, 2, -1, -1, -1].
    let ind: number;
    for (ind = 0; ind < mappings.length - 1; ind++) {
      if (mappings[ind] === -1 || (mappings[ind + 1] !== -1 && mappings[ind] > mappings[ind + 1])) {
        break;
      }
    }
    while (ind < mappings.length - 1) {
      mappings[ind + 1] = -1;
      ind++;
    }
    return mappings;
  }

  /**
   * Generate index name for newly added attribute index.
   *
   * @private
   * @param {string} attributeName
   * @returns {string}
   * @memberof SchemaChangeLogicService
   */
  private generateAttributeIndexName(attributeName: string): string {
    return attributeName + '_' + Math.random().toString(36).substring(2);
  }

  /**
   * Get alter jobs that only includes add attribute index jobs.
   *
   * @private
   * @param {AlterVertexOrEdge[]} alterJobs
   * @returns {AlterVertexOrEdge[]}
   * @memberof SchemaChangeLogicService
   */
  private getAddAttributeIndexJobs(alterJobs: AlterVertexOrEdge[]): AlterVertexOrEdge[] {
    return cloneDeep(alterJobs)
      .filter((alterJob) => alterJob.addIndexAttributes.length > 0)
      .map((alterJob) => {
        alterJob.addAttributes = [];
        (alterJob.dropAttributes = []), (alterJob.dropIndexAttributes = []);
        return alterJob;
      });
  }

  /**
   * Get alter jobs excluding add attribute index jobs.
   *
   * @private
   * @param {AlterVertexOrEdge[]} alterJobs
   * @returns {AlterVertexOrEdge[]}
   * @memberof SchemaChangeLogicService
   */
  private getAlterJobsExceptAddAttributeIndex(alterJobs: AlterVertexOrEdge[]): AlterVertexOrEdge[] {
    return alterJobs
      .map((alterJob) => set(alterJob, 'addIndexAttributes', []))
      .filter(
        (alterJob) =>
          alterJob.addAttributes.length > 0 ||
          alterJob.dropAttributes.length > 0 ||
          alterJob.dropIndexAttributes.length > 0
      );
  }
}
