import {
  Agent,
  AgentDiagnosticMetadata,
  ApiResponseType,
  jsonPrettyPrint,
  Organization,
  RbacUnauthorizedError,
  Relay,
  RELAY_BODY_FIELD_NAME,
  ResponseMeta,
  SUPERBLOCKS_REQUEST_ID_HEADER,
  SUPERBLOCKS_AUTHORIZATION_HEADER,
  SuperblocksError,
  DATA_DOMAIN_HEADER,
} from "@superblocksteam/shared";
import { compile } from "path-to-regexp";
import { CANCEL } from "redux-saga";
import { v4 as uuidv4 } from "uuid";
import AuthProvider from "auth/auth0";
import TokenProvider from "auth/token";
import {
  SUPERBLOCKS_UI_API_ENDPOINT_BASE_URL,
  SUPERBLOCKS_UI_DATA_DOMAIN,
} from "env";
import { API_STATUS_CODES } from "legacy/constants/ApiConstants";
import { SUPPORT_EMAIL_ADDRESS } from "legacy/constants/routes";
import PerformanceTracker, {
  PerformanceName,
} from "legacy/utils/PerformanceTracker";
import { FileRequestParam } from "store/slices/apisV2/backend-types";
import { is4xx, is5xx, StatusCodes } from "utils/http";
import logger, { getUIErrorTypeByCode } from "utils/logger";

import { noActiveAgentMessage } from "../../utils/error/error";
import {
  sendAgentSystemErrorNotification,
  sendErrorUINotification,
  sendWarningUINotification,
} from "../../utils/notification";
import { AgentApiPaths } from "./agent";
import { AgentSystemError, HttpError } from "./types";

const NETWORK_ERRORS = ["Failed to fetch", "network error"];

export const buildQueryString = (query: Record<string, string | string[]>) => {
  const params = new URLSearchParams();

  Object.entries(query).forEach(([key, values]) => {
    if (Array.isArray(values)) {
      // if the same query param key has multple values
      values.forEach((value) => params.append(key, value));
    } else {
      params.append(key, values);
    }
  });
  return params.toString();
};

enum CancelCause {
  CANCELLED_BY_TIMEOUT = "CANCELLED_BY_TIMEOUT",
  CANCELLED_BY_USER = "CANCELLED_BY_USER",
}

const showNetworkError = (message: string) => {
  sendErrorUINotification({
    message: `There was an error, your work might not be saved. (${message})`,
  });
};

const show4XXError = (message: string) => {
  sendErrorUINotification({
    message: `There is something wrong with your request. (${message})`,
  });
};

const DEFAULT_HEADERS = Object.freeze({
  "Content-Type": "application/json",
});

const API_BASE_URL = SUPERBLOCKS_UI_API_ENDPOINT_BASE_URL || "/api/";

export const HttpMethod = {
  Get: "GET",
  Post: "POST",
  Put: "PUT",
  Patch: "PATCH",
  Delete: "DELETE",
} as const;

type HttpMethodName = (typeof HttpMethod)[keyof typeof HttpMethod];

interface BaseCallOptions {
  method: HttpMethodName;
  url: string;
  body?: Record<string, any>;
  query?: Record<string, string | string[]>;
  params?: Record<string, unknown>;
  timeout?: number;
  abortController?: AbortController;
  diagnosticMetadata?: AgentDiagnosticMetadata;
  responseType?: ApiResponseType;
  headers?: Record<string, string>;
}

export interface HttpCallOptions extends BaseCallOptions {
  baseUrl?: string;
  // We do not support generic file uploads- only specific /execute calls support this
  controlFlowOnlyFiles?: FileRequestParam[];
}

export interface ErrorHandlingOptions {
  notifyOnError: boolean;
  shouldCrashApp?: boolean;
  onError?: (err: HttpError) => void;
}

interface HttpAgentCallOptions extends HttpCallOptions {
  agents: Agent[];
  files?: Record<string, File>;
  organization: Organization;
  relay?: Relay;
  notifyOnSystemError?: boolean;
  headers?: Record<string, string>;
}

interface BaseFetchOptions {
  augmentedInit: RequestInit;
  timeoutId: ReturnType<typeof setTimeout>;
  body?: Record<string, any>;
  path?: string;
  diagnosticMetadata?: AgentDiagnosticMetadata;
  errorHandlingOptions?: ErrorHandlingOptions;
}

interface FetchOptions extends BaseFetchOptions {
  fullUrl: string;
}

interface StreamingFetchOptions extends FetchOptions {
  onMessage?: (message: any) => void;
  processStreamEvents?: (event: any) => void;
}

interface CallAgentWithRetryOptions extends BaseFetchOptions {
  baseUrl?: string;
  agents: Agent[];
  organization: Organization;
  notifyOnSystemError: boolean;
  query?: Record<string, string | string[]>;
  params?: Record<string, unknown>;
}

function shouldHaveBody(method: HttpMethodName) {
  return method !== HttpMethod.Get && method !== HttpMethod.Delete;
}

function joinPaths(...paths: string[]) {
  return paths
    .map((path, index) => {
      const trimmedPath = path.trim();
      if (index === paths.length - 1) {
        return trimmedPath.replace(/(^[\/]*)/g, "");
      } else if (index) {
        return trimmedPath.replace(/(^[\/]*|[\/]*$)/g, "");
      } else {
        return trimmedPath.replace(/[\/]*$/g, "");
      }
    })
    .filter((path) => path.length)
    .join("/");
}

const handleError = (
  rawError: HttpError | TypeError,
  { fullUrl, augmentedInit, errorHandlingOptions }: FetchOptions,
) => {
  const error =
    rawError instanceof TypeError
      ? // this is a network error
        // the code below expects that error is an instance of HttpError
        // so represent this network error as an HttpError with status=0 (which is a common convention)
        new HttpError(0, true, rawError.message)
      : rawError;
  //response with json parsed, but has error code
  logger.error(
    `Request to '${fullUrl} has failed
Response Status: ${error.code}
Response Status Text: ${error.message}`,
    {
      superblocks_ui_error_type: getUIErrorTypeByCode(error.code),
      superblocks_ui_error_code: error.code,
    },
  );
  //previous try catch ensured the error captured here is HttpError
  let critical = error.critical;
  // userNotified keeps track if we've notified the user of this error. It helps
  // preventing showing multiple notifications for the same error.
  let userNotified = false;
  // network error should not block user from editing api, we can just show error notification
  // we can add errors that are non-critical here
  if (fullUrl.match(/apis\/.*/) && (critical || error.code === 0)) {
    if (errorHandlingOptions?.notifyOnError) {
      userNotified = true;
      showNetworkError(`${error.code || "Network error"}: ${error.message}`);
    }
    critical = false;
  }
  // Do not crash page over commits or deployments 500 errors
  if (fullUrl.match(/\/(commits|deployment).*/) && error.code === 500) {
    critical = false;
  }

  if (
    augmentedInit.method === HttpMethod.Get &&
    fullUrl.match(/apis\/.*/) &&
    error.code === API_STATUS_CODES.RESOURCE_FORBIDDEN
  ) {
    // It is critical if we're not allowed to fetch an API.
    critical = true;
  } else if (
    fullUrl.match(/apis\/.*/) &&
    error.code === API_STATUS_CODES.RESOURCE_FORBIDDEN
  ) {
    userNotified = true;
    sendErrorUINotification({
      message: error.message,
      duration: 0, // Don't auto-dismiss since this is a very confusing error
    });
  }

  if (error.code && error.code < 500 && error.code >= 400) {
    if (error.code === 404) {
      //set to true to redirect to crash page
      critical = true;
    } else if (errorHandlingOptions?.notifyOnError && !userNotified) {
      userNotified = true;
      show4XXError(`${error.message} (${error.code})`);
    }
  }

  if (
    error.code >= 500 && // we crash app on server 500 error normally, but explicity defining shouldCrashApp can escape it
    errorHandlingOptions?.shouldCrashApp === false //
  ) {
    critical = false;
  }

  // only modifying `critical`
  const editedError = new HttpError(error.code, critical, error.message);
  errorHandlingOptions?.onError?.(editedError);
  if (errorHandlingOptions?.notifyOnError && !userNotified) {
    throw editedError;
  }
};

const getErrorMessageFromObject = (message: object) => {
  const messageJSONString = JSON.stringify(message);
  const regex = /"message"\s*:\s*"([^"]*)"/;
  const match = regex.exec(messageJSONString);
  return match ? match[1] : messageJSONString;
};

const fetchServer = <TResult>({
  augmentedInit,
  fullUrl,
  timeoutId,
  errorHandlingOptions,
}: FetchOptions): Promise<TResult> => {
  return fetch(fullUrl, augmentedInit)
    .then(async (response) => {
      try {
        if (response.status === 204) {
          return null;
        }
        const json = await response.json();

        if (response.ok && json) {
          if (json.data == null) {
            return json; // v2 API endpoints do not wrap
          }
          return json.data as TResult;
        } else {
          let message = json.responseMeta?.error?.message ?? json?.message;
          if (typeof message === "object") {
            message = getErrorMessageFromObject(message);
          }

          throw new HttpError(
            response.status,
            !response.status || response.status >= 500,
            message,
          );
        }
      } catch (e) {
        //response with json parsed, but has error code
        if (e instanceof HttpError) {
          throw e;
        }
        //response cannot be parsed to json, throwing http error as it is the cause
        throw new HttpError(
          response.status,
          !response.status || response.status >= 500,
          response.statusText,
        );
      }
    })
    .catch((rawError: HttpError | TypeError) => {
      handleError(rawError, {
        fullUrl,
        augmentedInit,
        errorHandlingOptions,
        timeoutId,
      });
    })
    .finally(() => clearTimeout(timeoutId));
};

const fetchServerStream = async ({
  augmentedInit,
  fullUrl,
  timeoutId,
  errorHandlingOptions,
  onMessage,
  processStreamEvents,
}: StreamingFetchOptions): Promise<void> => {
  const handleMessage = (message: any) => {
    if (message?.result?.event?.data) {
      onMessage && onMessage(message.result.event.data);
    } else if (message?.result?.event?.start || message?.result?.event?.end) {
      // if the message is a start or end message, we want to process it so we can show the live execution results
      processStreamEvents && processStreamEvents(message);
    }
  };

  const onComplete = () => {
    clearTimeout(timeoutId);
  };

  const onError = (error: string, statusCode?: StatusCodes) => {
    if (statusCode != null) {
      handleError(
        new HttpError(statusCode, !statusCode || statusCode >= 500, error),
        {
          fullUrl,
          augmentedInit,
          errorHandlingOptions,
          timeoutId,
        },
      );
    } else {
      handleError(new TypeError(error), {
        fullUrl,
        augmentedInit,
        errorHandlingOptions: {
          ...errorHandlingOptions,
          // these are handled by the block outputs
          // TODO(Mark) the above comment should not be true, we should not be special here
          notifyOnError: NETWORK_ERRORS.includes(error)
            ? errorHandlingOptions?.notifyOnError ?? false
            : false,
        },
        timeoutId,
      });
    }
  };

  return stream({
    url: fullUrl,
    init: augmentedInit,
    onMessage: handleMessage,
    onComplete,
    onError,
    baseUrl: "",
  });
};

const _getAgentExecuteError = function (
  body: Record<string, any>,
  fullUrl: string,
  response: Response,
): {
  title: string;
  message: string;
} {
  if (response.status === StatusCodes.TOO_MANY_REQUESTS) {
    return {
      title: `Failed to execute ${body.apiName} due to too many concurrent requests.`,
      message: `If you need to increase your concurrency execution limit, please contact Superblocks Support via ${SUPPORT_EMAIL_ADDRESS} or start a chat with one of our engineers.`,
    };
  } else {
    return {
      title: `Failed to execute ${body.apiName}. Please contact your organization administrator.`,
      message: `API execution request made to agent URL '${fullUrl}' has failed with status code ${response.status}.`,
    };
  }
};

const handleResponse = async <TResult>(
  {
    augmentedInit,
    fullUrl,
    path,
    body = {},
    diagnosticMetadata = {},
  }: FetchOptions,
  response: Response,
  notifyOnSystemError: boolean,
): Promise<TResult & AgentSystemError> => {
  if (response.ok) {
    let resp;

    if (path === AgentApiPaths.API_EXECUTE) {
      const blob = await response.blob();

      PerformanceTracker.track(PerformanceName.API_RESPONSE_RECEIVED, {
        apiId: body.apiId,
        size: blob.size,
      });

      if (blob.size > 10 * 1024 * 1024 /* 10MB */) {
        sendWarningUINotification({
          message: `Warning: Response of ${body.apiName} is ${
            Math.round((blob.size / 1024 / 1024) * 100) / 100
          }MiB which is over 10MiB. You may experience degraded performance.`,
        });
      }

      const text = await blob.text();
      resp = JSON.parse(text);
    } else {
      resp = await response.json();
    }

    return (resp.data ?? resp) as TResult & AgentSystemError;
  }
  // Request failed with a HTTP error.
  let errorTitle = `Request made to agent received an error response.`;
  let errorMsg = `Request made to agent URL '${fullUrl}' has failed with status code ${response.status}.`;

  const json = {
    responseMeta: new ResponseMeta({
      status: response.status,
      message: response.statusText,
      success: false,
    }),
    data: {},
  };
  const responseJson = await response.json();
  if (
    responseJson?.responseMeta?.error?.superblocksError ===
    SuperblocksError.RbacUnauthorized
  ) {
    throw new RbacUnauthorizedError(responseJson?.responseMeta?.message);
  }

  switch (path) {
    case AgentApiPaths.DATASOURCE_TEST: {
      errorMsg = `Datasource connection test request made to agent URL '${fullUrl}' has failed with status code ${response.status}.`;
      // Configure the alert message that gets displayed in the Integration Form
      json.data = {
        message: errorMsg,
        success: false,
      };
      break;
    }
    case AgentApiPaths.API_EXECUTE: {
      const error = _getAgentExecuteError(body, fullUrl, response);
      errorMsg = error.message;
      errorTitle = error.title;
      json.data = {
        statusCode: response.status.toString(),
        apiId: body.apiId ?? "",
        apiName: body.apiName ?? "",
        context: { globals: {}, outputs: {} },
        executionError: errorMsg,
      };
      break;
    }
    default: {
      json.data = {
        message: errorMsg,
      };
      break;
    }
  }

  // Show an error notification card to users regardless of the
  // user Api notification settings, because this is an system error
  if (notifyOnSystemError) {
    sendAgentSystemErrorNotification({
      message: errorTitle,
      description: errorMsg,
    });
  }

  // Log error here without throwing an exception
  logger.error(
    `Request to '${fullUrl} has failed
Response: ${jsonPrettyPrint(responseJson)}
JSON: ${jsonPrettyPrint(json)}`,
    diagnosticMetadata,
  );

  return {
    systemError: `${errorTitle} ${errorMsg}`,
    systemErrorCode: response.status,
  } as unknown as TResult & AgentSystemError;
};

const makeFullUrl = (
  agentUrl: string,
  path?: string,
  query?: Record<string, string | string[]>,
  params?: Record<string, unknown>,
) => {
  let fullUrl = agentUrl;
  if (path) {
    const toPath = compile(path, { encode: encodeURIComponent });
    fullUrl = joinPaths(agentUrl, toPath(params));
  }
  if (query) {
    const queryString = `?${buildQueryString(query)}`;
    fullUrl = fullUrl + queryString;
  }
  return fullUrl;
};

const callAgentInternal = async <TResult>({
  augmentedInit,
  agents,
  timeoutId,
  path,
  baseUrl,
  params,
  query,
  body = {},
  organization,
  diagnosticMetadata = {},
  notifyOnSystemError,
}: CallAgentWithRetryOptions): Promise<TResult & AgentSystemError> => {
  if (!agents.length) {
    console.warn(
      `No active agent of type ${organization.agentType} found for organization ${organization.id}`,
    );
    return {
      systemError: noActiveAgentMessage(organization.agentType),
    } as unknown as TResult & AgentSystemError;
  }
  let lastErrorMessage: string | undefined;
  let isClientError = false;
  let isServerError = false;
  if (agents.length > 0) {
    const agent = agents[Math.floor(Math.random() * agents.length)];
    const fullUrl = makeFullUrl(
      baseUrl ? baseUrl : agent.url,
      path,
      query,
      params,
    );
    const fullDiagnosticMetadata = {
      ...diagnosticMetadata,
      superblocks_agent_id: agent.id,
      superblocks_agent_version: agent.version,
      superblocks_agent_version_external: agent.versionExternal,
      superblocks_agent_environment: agent.environment,
      superblocks_agent_url: agent.url,
      superblocks_org_id: organization.id,
      superblocks_org_name: organization.name,
    };
    try {
      const response = await fetch(fullUrl, augmentedInit);
      const result = await handleResponse<TResult & AgentSystemError>(
        {
          augmentedInit,
          fullUrl,
          timeoutId,
          body,
          path,
          diagnosticMetadata: fullDiagnosticMetadata,
        },
        response,
        notifyOnSystemError,
      );
      if (!result.systemError) {
        return result;
      }

      lastErrorMessage = result.systemError;

      if (is4xx(response.status)) {
        isClientError = true;
      }

      if (is5xx(response.status)) {
        isServerError = true;
      }
    } catch (e: any) {
      // Network errors will be caught here
      switch (e.constructor.name) {
        case "AbortError":
          lastErrorMessage = `Request to ${fullUrl} has timed out.`;
          break;
        case "TypeError":
          lastErrorMessage = `Request to ${fullUrl} has failed due to a network issue: ${e.message}`;
          break;
        case RbacUnauthorizedError.name:
          lastErrorMessage = e.message;
          sendErrorUINotification({
            message: e.message,
          });
          return {
            systemError: `Failed to execute api,  Cause: ${lastErrorMessage}`,
          } as unknown as TResult & AgentSystemError;
        case "DOMException":
          if (
            e.name === "AbortError" &&
            augmentedInit?.signal?.aborted &&
            path === AgentApiPaths.API_EXECUTE
          ) {
            const reason = augmentedInit?.signal?.reason;
            // only handle api execution
            if (reason === CancelCause.CANCELLED_BY_TIMEOUT) {
              lastErrorMessage = `Request on executing api (${body.apiName}) has timed out.`;
              sendAgentSystemErrorNotification({
                message: "Request has timed out",
                description: lastErrorMessage,
              });
            }
            return {} as TResult & AgentSystemError;
          }
          lastErrorMessage = e.message ?? e.name ?? "Unknown Error.";
          break;
        default:
          lastErrorMessage = e.message ?? e.name ?? "Unknown Error.";
          break;
      }
      logger.error(
        `Request to '${fullUrl}' has failed due to a network error.
Error: ${e}`,
        fullDiagnosticMetadata,
      );
    } finally {
      clearTimeout(timeoutId);
    }
  }

  let message;
  if (agents.length === 0) {
    message = `No agent was found`;
  } else if (isClientError) {
    message = "Failed to execute the request.";
  } else if (isServerError) {
    message =
      "Superblocks may be experiencing issues. Failed to execute the request.";
  } else {
    // All agent requests failed with network error. In this case, we notify the
    // user regardless of the notification settings, because this is a system error.
    message =
      "Failed to connect to the agents. Please contact your organization administrator.";
    if (notifyOnSystemError) {
      sendAgentSystemErrorNotification({
        message,
        description: lastErrorMessage,
      });
    }
  }

  return {
    systemError: `${message} Cause: ${lastErrorMessage}`,
  } as unknown as TResult & AgentSystemError;
};

export function callAgent<TResult = void>({
  method,
  url,
  baseUrl,
  params = {},
  query,
  body = {},
  relay = {
    headers: [SUPERBLOCKS_REQUEST_ID_HEADER],
    body: [],
    query: [],
  },
  files,
  timeout = 300000,
  organization,
  agents,
  diagnosticMetadata = {},
  notifyOnSystemError = false,
  headers = {},
}: HttpAgentCallOptions): Promise<TResult & AgentSystemError> {
  const abortController = new AbortController();
  const bodyWithRelay = {
    ...body,
    [RELAY_BODY_FIELD_NAME]: relay,
  };

  const init: RequestInit = {
    credentials: "include",
    headers: {
      ...DEFAULT_HEADERS,
      ...headers,
      [SUPERBLOCKS_REQUEST_ID_HEADER]: uuidv4(),
    },
    method: method,
    signal: abortController.signal,
  };
  const timeoutId = setTimeout(
    () => abortController.abort(CancelCause.CANCELLED_BY_TIMEOUT),
    timeout,
  );

  if (files && Object.keys(files).length) {
    const formData = new FormData();
    if (shouldHaveBody(method)) {
      formData.append("body", JSON.stringify(bodyWithRelay));
    }
    Object.entries(files).forEach(([key, file]) => {
      formData.append("files", file, key);
    });
    init.body = formData;
    // Delete the application/json header
    delete (init.headers as Record<string, unknown>)["Content-Type"];
  } else if (shouldHaveBody(method)) {
    init.body = JSON.stringify(bodyWithRelay);
  }

  let promise;
  if (AuthProvider.ready()) {
    promise = AuthProvider.generateToken().then((token: string) => {
      const augmentedInit = {
        ...init,
        headers: {
          ...init.headers,
          Authorization: `Bearer ${token}`,
        },
      };

      return callAgentInternal<TResult>({
        augmentedInit,
        timeoutId,
        path: url,
        baseUrl,
        params,
        query,
        body: bodyWithRelay,
        diagnosticMetadata,
        notifyOnSystemError,
        organization,
        agents,
      });
    });
  } else {
    const augmentedInit = {
      ...init,
      headers: init.headers,
    };
    return callAgentInternal<TResult>({
      augmentedInit,
      timeoutId,
      path: url,
      params,
      query,
      body: bodyWithRelay,
      diagnosticMetadata,
      notifyOnSystemError,
      organization,
      agents,
    });
  }

  (promise as any)[CANCEL] = () =>
    abortController.abort(CancelCause.CANCELLED_BY_USER);
  return promise;
}

type CallResponseType<TResult> = HttpCallOptions extends {
  responseType: ApiResponseType.STREAM;
}
  ? Promise<void>
  : Promise<TResult>;

export function callHelper<TResult = void>(
  {
    method,
    url,
    query,
    body,
    params = {},
    controlFlowOnlyFiles,
    timeout = 300000,
    abortController: abortController_,
    baseUrl = API_BASE_URL,
    responseType,
    headers,
  }: HttpCallOptions,
  errorHandlingOptions: ErrorHandlingOptions = {
    notifyOnError: true,
  },
  onMessage?: (message: any) => void,
  processStreamEvents?: (event: any) => void,
): CallResponseType<TResult> {
  const toPath = compile(
    `${url}${responseType === ApiResponseType.STREAM ? "/stream" : ""}`,
    { encode: encodeURIComponent },
  );
  const abortController = abortController_ ?? new AbortController();
  const queryString = query ? `?${buildQueryString(query)}` : "";
  const fullUrl = joinPaths(baseUrl, toPath(params)) + queryString;
  const authorizationJwt = TokenProvider.getToken();
  const init: RequestInit = {
    credentials: "include",
    headers: {
      ...DEFAULT_HEADERS,
      ...(authorizationJwt
        ? {
            [SUPERBLOCKS_AUTHORIZATION_HEADER]: `Bearer ${authorizationJwt}`,
          }
        : {}),

      [SUPERBLOCKS_REQUEST_ID_HEADER]: uuidv4(),
      ...headers,
    },
    method: method,
    signal: abortController.signal,
  };
  const timeoutId = setTimeout(() => abortController.abort(), timeout);

  // TODO: use control flow flag
  const hasFiles = controlFlowOnlyFiles && controlFlowOnlyFiles.length > 0;
  if (hasFiles) {
    init.body = JSON.stringify({
      ...(body ?? {}),
      files: controlFlowOnlyFiles,
    });
  } else if (shouldHaveBody(method)) {
    init.body = JSON.stringify(body ?? {});
  }

  let promise;
  if (AuthProvider.ready()) {
    promise = AuthProvider.generateToken().then((token: string) => {
      const augmentedInit = {
        ...init,
        headers: {
          ...init.headers,
          Authorization: `Bearer ${token}`,
        },
      };
      if (responseType === ApiResponseType.STREAM) {
        return fetchServerStream({
          augmentedInit,
          fullUrl,
          timeoutId,
          errorHandlingOptions,
          onMessage,
          processStreamEvents,
        }) as Promise<void> as CallResponseType<TResult>;
      } else {
        return fetchServer({
          augmentedInit,
          fullUrl,
          timeoutId,
          errorHandlingOptions,
        }) as Promise<TResult> as CallResponseType<TResult>;
      }
    });
  } else {
    const augmentedInit = {
      ...init,
      headers: {
        ...init.headers,
      },
    };
    if (responseType === ApiResponseType.STREAM) {
      promise = fetchServerStream({
        augmentedInit,
        fullUrl,
        timeoutId,
        errorHandlingOptions,
        onMessage,
        processStreamEvents,
      }) as Promise<void> as CallResponseType<TResult>;
    } else {
      promise = fetchServer({
        augmentedInit,
        fullUrl,
        timeoutId,
        errorHandlingOptions,
      }) as Promise<TResult> as CallResponseType<TResult>;
    }
  }
  (promise as any)[CANCEL] = () => abortController.abort();

  return promise;
}

export function callServer<TResult = void>(
  {
    method,
    url,
    query,
    body,
    params = {},
    controlFlowOnlyFiles,
    timeout = 300000,
    abortController: abortController_,
    responseType,
    removeApiPath,
  }: Omit<HttpCallOptions, "baseUrl"> & {
    removeApiPath?: boolean; // to not use /api as root path
  },
  errorHandlingOptions: ErrorHandlingOptions = {
    notifyOnError: true,
  },
  onMessage?: (message: any) => void,
  processStreamEvents?: (event: any) => void,
): CallResponseType<TResult> {
  return callHelper(
    {
      method,
      url,
      query,
      body,
      params,
      controlFlowOnlyFiles,
      timeout,
      abortController: abortController_,
      baseUrl: removeApiPath
        ? API_BASE_URL.replace("/api/", "", API_BASE_URL)
        : API_BASE_URL,
      responseType,
      headers: {
        [DATA_DOMAIN_HEADER]: SUPERBLOCKS_UI_DATA_DOMAIN,
      },
    },
    errorHandlingOptions,
    onMessage,
    processStreamEvents,
  );
}

export const stream = async ({
  url,
  headers,
  body,
  onMessage,
  onComplete,
  onError,
  defaultError,
  method,
  baseUrl = API_BASE_URL,
  signal,
  init: overrideInit,
}: {
  url: string;
  init?: RequestInit;
  headers?: Record<string, string>;
  body?: Record<string, any>;
  onMessage: (message: any) => void;
  onComplete: () => void;
  onError: (error: string, statusCode?: StatusCodes) => void;
  defaultError?: string;
  method?: HttpMethodName;
  baseUrl?: string;
  signal?: AbortSignal;
}): Promise<void> => {
  const init: RequestInit = overrideInit ?? {
    body: JSON.stringify(body),
    headers,
    method,
  };
  try {
    const response = await fetch(`${baseUrl}${url}`, init);
    if (!response.ok) {
      try {
        const parsed = await response.json();

        throw new HttpError(
          response.status,
          !response.status || response.status >= 500,
          parsed?.error?.message ??
            parsed?.responseMeta?.error?.message ??
            defaultError,
        );
      } catch (e) {
        if (e instanceof HttpError) {
          throw e;
        }
        //response cannot be parsed to json, throwing http error as it is the cause
        throw new HttpError(
          response.status,
          !response.status || response.status >= 500,
          response.statusText,
        );
      }
    }
    if (!response.body) {
      return;
    }
    const reader = response.body.getReader();
    const decoder = new TextDecoder();
    let done = false;

    let lastChunk = "";
    while (!done) {
      if (signal && signal.aborted) {
        return; // Stop processing if the fetch was cancelled
      }
      const { value, done: readerDone } = await reader.read();
      const chunk = lastChunk + decoder.decode(value);
      const messages = chunk.split("\n");

      let jsonValues: { error?: { message: string } }[] = [];
      try {
        jsonValues = messages
          .filter(Boolean)
          .map((message) => JSON.parse(message.trim()));
        // Reset the last chunk if we were able to parse the message
        lastChunk = "";
      } catch (e) {
        // If we can't parse the message, it's probably because we only got a partial message
        // Append the chunk to the last chunk and try again next time
        lastChunk = chunk;

        // If we fail to parse and the reader is done, then we should throw an error
        // as we won't be able to parse the message
        if (readerDone) {
          throw e;
        }
      }

      for (const jsonValue of jsonValues.filter(Boolean)) {
        if (jsonValue?.error) {
          onError(jsonValue.error.message ?? defaultError);
          return;
        }
        onMessage(jsonValue);
      }

      done = readerDone && lastChunk === "";
    }
  } catch (e: HttpError | TypeError | any) {
    if (e.name === "AbortError") {
      console.log("Fetch was cancelled");
    } else {
      onError(e.message ?? defaultError, e.code);
    }
  } finally {
    onComplete();
  }
};
