/* eslint-disable no-underscore-dangle */
import type {
  ExtendObservables,
  ExtendReactivity,
  RxCollection,
  RxDatabase,
  RxDocument,
  RxDocumentBase,
  RxReplicationPullStreamItem,
  RxReplicationWriteToMasterRow,
  WithDeleted,
} from "rxdb";
import { replicateRxCollection, RxReplicationState } from "rxdb/plugins/replication";
import { Subject } from "rxjs";
import request from "graphql-request";
import { Client, createClient } from "graphql-ws";
import { Network } from "@capacitor/network";
import { PluginListenerHandle } from "@capacitor/core";
import { isEmpty, isNil } from "lodash-es";
import { Authorization, userManager } from "../hooks/useAuth";
import { getAuthHeader } from "../utils/authUtil";
import { RemoteSubmission, remoteSubmissionToLocal, Submission, submissionToRemoteHasura } from "../types/Submission";
import { getCurrentDevice } from "../utils/deviceUtil";
import { DBCollections, QUERIES } from "../utils/databaseUtil";
import { Field, FieldMeta, fieldToRemoteHasura, HasuraField, RemoteField, remoteFieldToLocal } from "../types/Field";
import { getActiveSubmissionIds, getSortedFields } from "../utils/submissionUtil";
import logger from "../utils/logger";
import {
  getFieldsForSubmissionIds,
  getSortedFieldsWithParentsFirst,
  removeOrphanedFields,
  scrubHasuraFields,
} from "../utils/fieldUtil";
import { seconds } from "../utils/timeUtil";

export const DOCUMENT_BATCH_SIZE = 500;
const SYNC_URLS = {
  http: `https://${process.env.VITE_HASURA_ENDPOINT}`,
  ws: `wss://${process.env.VITE_HASURA_ENDPOINT}`,
};

type FieldDocumentCheckpoint = {
  seq: number;
};

type FieldStream = RxReplicationPullStreamItem<Field, FieldDocumentCheckpoint>;

type RemoteFieldWithSeq = RemoteField & { seq: number };

type GraphQLFieldResponse = { app_submission_fields?: WithDeleted<RemoteFieldWithSeq>[] };

type GraphQLFieldStreamResponse = {
  app_submission_fields_stream?: WithDeleted<RemoteFieldWithSeq>[];
};

type SubmissionDocumentCheckpoint = {
  seq: number;
};

type SubmissionStream = RxReplicationPullStreamItem<Submission, SubmissionDocumentCheckpoint>;

type RemoteSubmissionWithSeq = RemoteSubmission & { seq: number };

type GraphQLSubmissionResponse = { app_submissions?: WithDeleted<RemoteSubmissionWithSeq>[] };

type GraphQLSubmissionStreamResponse = {
  app_submissions_stream?: WithDeleted<RemoteSubmissionWithSeq>[];
};

type ReplicationState = {
  submissions?: RxReplicationState<Submission, SubmissionDocumentCheckpoint>;
  fields?: RxReplicationState<Field, FieldDocumentCheckpoint>;
};

export class GraphQLReplicator {
  db: RxDatabase<DBCollections>;

  deviceId: string;

  refreshAccessToken: () => Promise<Authorization>;

  setInitiallySynced: (isSynced: boolean) => void;

  state: ReplicationState;

  subscriptionClient?: Client;

  submissionSubject?: Subject<SubmissionStream>;

  fieldSubject?: Subject<FieldStream>;

  status: "init" | "starting" | "active" | "stopping" | "stopped";

  networkListener?: PluginListenerHandle;

  constructor(
    db: RxDatabase<DBCollections>,
    refreshAccessToken: () => Promise<Authorization>,
    setInitiallySynced: (value: boolean) => void,
    deviceId: string,
  ) {
    this.db = db;
    this.refreshAccessToken = refreshAccessToken;
    this.setInitiallySynced = setInitiallySynced;
    this.deviceId = deviceId;
    this.state = {
      submissions: undefined,
      fields: undefined,
    };
    this.subscriptionClient = undefined;
    this.status = "init";
    this.networkListener = undefined;
  }

  resetState = async (): Promise<void> => {
    this.status = "stopping";
    try {
      // Cancel live replication
      if (this.subscriptionClient) {
        await this.subscriptionClient.dispose();
      }
      if (this.networkListener) {
        await this.networkListener?.remove();
      }

      // Force user to get as much in sync as possible
      this.state.submissions?.reSync();
      this.state.fields?.reSync();
      await this.state.submissions?.awaitInSync();
      await this.state.fields?.awaitInSync();

      // Remove replication state to force a clean state
      await this.state.fields?.remove();
      await this.state.submissions?.remove();
    } finally {
      this.status = "stopped";
    }

    // Restart replication to resynchronize everything
    await this.start();
  };

  start = async (): Promise<void> => {
    if (this.status === "active" || this.status === "starting") {
      return; // Don't start twice
    }
    this.status = "starting";
    await this.initNetworkListener();
    await this.db.waitForLeadership();
    this.setInitiallySynced(false); // Only show banner if this instance is leader

    // Previous session might have been killed due to expired token, refresh if we have to.
    await this.refreshTokenWhenExpired();

    // Setup Subscription client
    this.subscriptionClient = createClient({
      url: SYNC_URLS.ws,
      lazy: true,
      shouldRetry: () => false, // Don't retry in case the connection dipped out. We want to re-sync when connection is reestablished.
      connectionParams: async () => {
        const authHeader = await getAuthHeader();
        return { headers: { Authorization: authHeader } };
      },
    });

    try {
      this.subscriptionClient.on("closed", async () => {
        if (this.status !== "active") {
          return;
        }
        await this.stop();
      });
    } catch (e) {
      logger.warn("Closed event caused errors, stop replication to get out of edge-case", e);
      await this.stop();
    }

    try {
      // Start Submission Replication
      this.state.submissions = await this.setupSubmissionReplication();

      // Start Field Replication
      this.state.fields = await this.setupFieldReplication();

      this.setInitiallySynced(true);
      this.status = "active";
    } catch (e) {
      logger.warn("Could not setup replication", e);
    }
  };

  private async refreshTokenWhenExpired(): Promise<void> {
    const user = await userManager.getUser();
    const isExpired = !user || user.expired;
    // Don't refresh access token when we don't need to
    if (isExpired) {
      await this.refreshAccessToken();
    }
  }

  initNetworkListener = async (): Promise<void> => {
    this.networkListener = await Network.addListener("networkStatusChange", async (status) => {
      if (!status.connected && (this.status === "active" || this.status === "starting")) {
        await this.stop(); // Kill active session
      }
    });
  };

  async stop(): Promise<void> {
    this.status = "stopping";
    try {
      await this.state.submissions?.cancel();
      await this.state.fields?.cancel();

      if (this.subscriptionClient) {
        await this.subscriptionClient.dispose();
      }
      if (this.networkListener) {
        await this.networkListener?.remove();
      }
    } finally {
      this.status = "stopped";
    }
  }

  async setupSubmissionReplication(): Promise<RxReplicationState<Submission, SubmissionDocumentCheckpoint>> {
    this.submissionSubject = new Subject<SubmissionStream>();
    let lastCheckpoint: SubmissionDocumentCheckpoint | undefined;
    const state = replicateRxCollection<Submission, SubmissionDocumentCheckpoint>({
      replicationIdentifier: `moreapp-submissions-to-${SYNC_URLS.http}`,
      collection: this.db.submissions,
      pull: {
        batchSize: 100,
        handler: async (lastPulledCheckpoint, limit) => {
          const authHeader = await getAuthHeader();
          const checkpoint = isNil(lastPulledCheckpoint) ? { seq: 0 } : lastPulledCheckpoint;

          const result = await request<GraphQLSubmissionResponse>(
            SYNC_URLS.http,
            QUERIES.PULL.submission(checkpoint.seq === 0),
            { seq: checkpoint.seq ?? 0, limit },
            { Authorization: authHeader },
          );

          const submissions = result?.app_submissions;
          if (!submissions || isEmpty(submissions)) {
            // Set new reference point for subscriptions to start subscribing to changes
            lastCheckpoint = checkpoint;

            // No relevant changes in pull, use same checkpoint because it can't be inferred from the last document
            return { documents: [], checkpoint };
          }
          await this.removeDeletedFields(submissions);

          // Set latest document as the new checkpoint. On the next pull, we'll use this as cursor parameter
          const latestDocument = lastOfArray(submissions);
          const newCheckpoint = { seq: latestDocument.seq };

          // Set new reference point for subscriptions to start subscribing to changes
          lastCheckpoint = newCheckpoint;

          return { checkpoint: newCheckpoint, documents: submissions as any }; // type will be correct after the modifier is applied
        },
        modifier: (doc: WithDeleted<RemoteSubmission>) => remoteSubmissionToLocal(doc),
        stream$: this.submissionSubject.asObservable(),
      },
      push: {
        batchSize: 10,
        handler: async (docs: RxReplicationWriteToMasterRow<Submission>[]) => {
          try {
            const variables = await pushQuerySubmissionBuilder(this.db.fields, docs);
            const authHeader = await getAuthHeader();
            await request(SYNC_URLS.http, QUERIES.PUSH.submission, variables, {
              Authorization: authHeader,
            });
            return [];
          } catch (e) {
            logger.error("Couldn't push submissions", e);
            throw e;
          }
        },
      },
      live: true,
      retryTime: seconds(20),
      deletedField: "_deleted",
    });
    await state.awaitInitialReplication();
    await state.awaitInSync();
    if (!lastCheckpoint) {
      // Don't start subscription if no checkpoint is available
      logger.error("Couldn't find checkpoint for submission replication");
      return state;
    }
    this.subscriptionClient?.subscribe<GraphQLSubmissionStreamResponse>(
      {
        query: QUERIES.SUBSCRIPTION.submission,
        variables: { seq: lastCheckpoint.seq },
      },
      {
        complete: () => this.submissionSubject?.complete(),
        next: async (value) => {
          const submissions = value?.data?.app_submissions_stream;
          if (!submissions || isEmpty(submissions)) {
            return;
          }
          await this.removeDeletedFields(submissions);
          const lastDocument = lastOfArray(submissions);
          const checkpoint = { seq: lastDocument.seq };
          this.submissionSubject?.next({ checkpoint, documents: submissions as any });
        },
        error: (err) => this.submissionSubject?.error(err),
      },
    );
    return state;
  }

  async setupFieldReplication(): Promise<RxReplicationState<Field, FieldDocumentCheckpoint>> {
    this.fieldSubject = new Subject<FieldStream>();
    let lastCheckpoint: FieldDocumentCheckpoint | undefined;
    const state = replicateRxCollection<Field, FieldDocumentCheckpoint>({
      replicationIdentifier: `moreapp-fields-to-${SYNC_URLS.http}`,
      collection: this.db.fields,
      pull: {
        batchSize: 500,
        handler: async (lastPulledCheckpoint, limit) => {
          const authHeader = await getAuthHeader();
          const checkpoint = isNil(lastPulledCheckpoint) ? { seq: 0 } : lastPulledCheckpoint;

          const result = await request<GraphQLFieldResponse>(
            SYNC_URLS.http,
            QUERIES.PULL.fields(),
            { seq: checkpoint.seq ?? 0, limit },
            { Authorization: authHeader },
          );
          const fields = result?.app_submission_fields;
          if (!fields || isEmpty(fields)) {
            // Set new reference point for subscriptions to start subscribing to changes
            lastCheckpoint = checkpoint;

            // No relevant changes in pull, use same checkpoint because it can't be inferred from the last document
            return { documents: [], checkpoint };
          }
          await upsertFieldMeta(fields, this.db.fieldmeta);

          // Set latest document as the new checkpoint. On the next pull, we'll use this as cursor parameter
          const lastDocument = lastOfArray(fields);
          const newCheckpoint = { seq: lastDocument.seq };

          // Set new reference point for subscriptions to start subscribing to changes
          lastCheckpoint = newCheckpoint;

          return { checkpoint: newCheckpoint, documents: fields as any }; // type will be correct after the modifier is applied
        },
        modifier: (doc: WithDeleted<RemoteField>) => remoteFieldToLocal(doc),
        stream$: this.fieldSubject.asObservable(),
      },
      push: {
        batchSize: 100,
        handler: async (docs: RxReplicationWriteToMasterRow<Field>[]) => {
          let fields: HasuraField[] = [];
          try {
            const fieldsWithUpdate = docs
              .filter((field) => field.newDocumentState)
              .map((field) => field.newDocumentState);
            const activeFields = await this.getActiveFields(fieldsWithUpdate);
            const sortedFields = await getSortedFieldsWithParentsFirst(activeFields, this.db);
            if (isEmpty(sortedFields)) {
              return []; // Modifier can filter out fields, meaning that this can be triggered but not actually have documents to push
            }
            const authHeader = await getAuthHeader();
            fields = sortedFields.map((field) => fieldToRemoteHasura(field));
            await request(SYNC_URLS.http, QUERIES.PUSH.field, { fields }, { Authorization: authHeader });
            return [];
          } catch (e) {
            // scrub any potential PII before sending fields to Sentry
            const scrubbedFields = scrubHasuraFields(fields);
            logger.error("Couldn't push fields", e, { extra: { fields: scrubbedFields } });
            throw e;
          }
        },
        modifier(doc: WithDeleted<Field>) {
          return doc.status === "final" || doc._deleted ? null : doc;
        },
      },
      live: true,
      retryTime: seconds(20),
      deletedField: "_deleted",
    });

    await state.awaitInitialReplication();
    await state.awaitInSync();

    if (!lastCheckpoint) {
      // Don't start subscription if no checkpoint is available
      logger.error("Couldn't find checkpoint for field replication");
      return state;
    }

    this.subscriptionClient?.subscribe<GraphQLFieldStreamResponse>(
      {
        query: QUERIES.SUBSCRIPTION.field,
        variables: { seq: lastCheckpoint.seq },
      },
      {
        complete: () => this.fieldSubject?.complete(),
        next: async (value) => {
          const fields = value?.data?.app_submission_fields_stream;
          if (!fields || isEmpty(fields)) {
            return;
          }
          await upsertFieldMeta(fields, this.db.fieldmeta);
          const lastDocument = lastOfArray(fields);
          const checkpoint = { seq: lastDocument.seq };

          // Ignore Fields that are made on this device, it can only be the current or a past revision
          const documents = fields.filter((x: RemoteField) => x.meta.deviceId !== this.deviceId);
          // Commit the changes
          this.fieldSubject?.next({ checkpoint, documents: documents as any }); // type will be correct after the modifier is applied
        },
        error: (err) => this.fieldSubject?.error(err),
      },
    );

    return state;
  }

  async getActiveFields(fieldsWithUpdate: Field[]): Promise<Field[]> {
    const submissionIds = fieldsWithUpdate.map((field) => field.submissionId);
    const activeSubmissionIds = await getActiveSubmissionIds(submissionIds, this.db);
    return getFieldsForSubmissionIds(fieldsWithUpdate, activeSubmissionIds);
  }

  /**
   * The fields could already be completely gone from our database, so it won't appear in the Field sync.
   * Remove those fields here, to keep the local database clean.
   * Also clear the metadata, it's not needed anymore once the field is removed locally.
   * @param submissions newly synced submissions
   */
  async removeDeletedFields(submissions: WithDeleted<RemoteSubmission>[]): Promise<void> {
    const submissionIds = submissions.filter((submission) => submission._deleted).map((submission) => submission.id);
    if (isEmpty(submissionIds)) {
      return;
    }
    if (!this.db?.fields) {
      return;
    }
    // Remove fields of deleted submissions
    const fields = await this.db.fields.find().where("submissionId").in(submissionIds).exec();
    await Promise.allSettled(
      fields
        .filter((field) => !field.deleted)
        .map(async (field) => {
          await field.getLatest().remove();
        }),
    );

    const fieldMeta = await this.db.fieldmeta.find().where("submissionId").in(submissionIds).exec();
    await Promise.allSettled(fieldMeta.filter((meta) => !meta.deleted).map(async (meta) => meta.getLatest().remove()));
  }
}

const pushQuerySubmissionBuilder = async (
  collection: RxCollection<Field>,
  rows: RxReplicationWriteToMasterRow<Submission>[],
): Promise<{ submissions: RemoteSubmission[]; fields: HasuraField[] }> => {
  const updatedFields: Field[] = [];

  const fieldPromises = rows.map(async (row) => {
    if (row.newDocumentState.status === "final" && !row.newDocumentState._deleted) {
      const localFields = await getFieldsForSubmission(collection, row.newDocumentState.id);
      const results = await finalizeLocalFields(localFields);
      updatedFields.push(...results);
    } else if (row.newDocumentState._deleted) {
      const localFields = await getFieldsForSubmission(collection, row.newDocumentState.id);
      await collection.bulkRemove(localFields.map((field) => field.id));
    }
    return Promise.resolve();
  });

  await Promise.all(fieldPromises);

  const submissions = rows
    .filter((x) => x.newDocumentState)
    .map((row) => submissionToRemoteHasura(row.newDocumentState));

  if (isEmpty(updatedFields)) {
    // only when the submission is "final", we include all the fields
    return { submissions, fields: [] };
  }

  const cleanFields = removeOrphanedFields(updatedFields);
  const sortedFields = getSortedFields(cleanFields);
  const fields = sortedFields.map((field) => fieldToRemoteHasura(field, "final"));

  return { submissions, fields };
};

const finalizeLocalFields = async (
  localFields: RxDocument<Field>[],
): Promise<
  (RxDocumentBase<Field, {}, unknown> & Field & ExtendObservables<Field> & ExtendReactivity<Field, unknown>)[]
> => {
  const { id: deviceId } = await getCurrentDevice();
  return Promise.all(localFields.map(async (field) => field.incrementalPatch({ status: "final", deviceId })));
};

const upsertFieldMeta = async (fields: RemoteFieldWithSeq[], fieldMeta: RxCollection<FieldMeta>): Promise<void> => {
  await fieldMeta.bulkUpsert(fields.map((field) => ({ id: field.id, submissionId: field.submissionId, remote: true })));
};

const getFieldsForSubmission = async (
  collection: RxCollection<Field>,
  submissionId: string,
): Promise<RxDocument<Field>[]> => collection.find().where("submissionId").eq(submissionId).exec();

const lastOfArray = <T>(ar: T[]): T => ar[ar.length - 1];
