/* Types */
import { useCloudApi, useFlinkApi } from '@/composables/cloudApi'
import { useAnalytics } from '@/composables/analytics'
import type {
  CloudV1alpha1PoolMemberReference,
  V1alpha1BookkeeperNodeResource,
  V1alpha1BookKeeperSetReference,
  V1alpha1DefaultNodeResource,
  V1alpha1FlinkCluster,
  V1alpha1PulsarCluster,
  V1alpha1AuditLog
} from '@streamnative/cloud-api-client-typescript'
import { i18n } from '@/lang'
import type { ComputedRef, Ref } from 'vue'
import { cpuParser, memoryParser } from 'k8s-resource-parser'
import AsyncLock from 'async-lock'
import { useIntervalFn } from '@vueuse/core'
import { getErrorMessage } from '@/utils/apiHelper'
import type { MetricsData } from '@/utils/charts'
import type { PulsarState } from './usePulsarState'

let lastOrg: string | undefined = undefined

const { t } = i18n.global
const lock = new AsyncLock()

const memoryPerUnit = 8589934592 // 8gb
const cpuPerUnit = 2000 // 2 full cpu
// see here https://github.com/streamnative/cloud-manager/issues/2974 for details
const cpuPerUnitRaw = 2
const maxBrokerCu = 8
const maxBookieSu = 8
const throughputPerSu = 125
const bkReplFactor = 3
const memoryGbPerSu = 8
const brokerMinimumNodeCount = 2
const bookieMinimumNodeCount = 3
const brokerMaximumNodeCount = 15
const bookieMaximumNodeCount = 15
const minThroughput = 5
const minBrokerCu = 0.2
const minFreeBrokerCu = 0.1
const minBookieSu = 0.2
const minResourceUnit = Math.min(minBrokerCu, minBookieSu)
const minFreeCU = 0.0298
const freeResources = {
  brokerCU: minFreeBrokerCu,
  brokerNodeCount: 1,
  bookieSU: undefined as undefined | number,
  bookieNodeCount: undefined as undefined | number,
  config: {} as ClusterConfig
}
const resourceOptions = [
  {
    throughput: 16.7,
    message:
      'For teams new to Pulsar with first production messaging use case. Supports thousands of messages/second across tens of topics.',
    brokerNodeCount: 2,
    brokerCU: 0.2,
    bookieNodeCount: 3,
    bookieSU: 0.2,
    config: {} as ClusterConfig
  },
  {
    throughput: 62.5,
    message:
      'For teams growing Pulsar usage across multiple messaging use cases. Supports tens of thousands of messages/second across hundreds of topics.',
    brokerNodeCount: 3,
    brokerCU: 0.5,
    bookieNodeCount: 3,
    bookieSU: 0.5,
    config: {} as ClusterConfig
  },
  {
    throughput: 125,
    message:
      'For large teams and companies supporting both messaging and small-scale streaming use cases. Supports hundreds of thousands of messages/second across thousands of topics.',
    brokerNodeCount: 3,
    brokerCU: 1,
    bookieNodeCount: 3,
    bookieSU: 1,
    config: {} as ClusterConfig
  },
  {
    throughput: 250,
    message:
      'For companies starting to use Pulsar across the whole organization with diverse messaging and streaming use cases. Supports hundreds of thousands of messages/second across thousands of topics.',
    brokerNodeCount: 3,
    brokerCU: 2,
    bookieNodeCount: 3,
    bookieSU: 2,
    config: {} as ClusterConfig
  },
  {
    throughput: 500,
    message:
      'For companies scaling Pulsar across the whole organization with growing messaging and streaming use cases. Supports more than a half a million messages/second across tens of thousands of topics.',
    brokerNodeCount: 3,
    brokerCU: 4,
    bookieNodeCount: 3,
    bookieSU: 4
  },
  {
    throughput: 750,
    message:
      'For companies with serious Pulsar usage and large scale streaming. Supports millions of messages/second across tens of thousands of topics.',
    brokerNodeCount: 6,
    brokerCU: 3,
    bookieNodeCount: 6,
    bookieSU: 3,
    config: {} as ClusterConfig
  }
]

const clusterMetrics = ref({
  producers: 0,
  consumers: 0,
  subscriptions: 0,
  topics: 0,
  throughputIn: {
    time: [],
    data: new Array<MetricsData>(),
    maxValue: 0,
    minValue: 0
  },
  throughputOut: {
    time: [],
    data: new Array<MetricsData>(),
    maxValue: 0,
    minValue: 0
  },
  backlogSize: {
    time: [],
    data: new Array<MetricsData>(),
    value: [],
    maxValue: 0,
    minValue: 0
  },
  storageSize: {
    time: [],
    data: new Array<MetricsData>(),
    maxValue: 0,
    minValue: 0
  },
  throughtputOption: {},
  storageOption: {},
  backlogOption: {},
  lastTime: 'lastHours'
})

export const BROKER_OPTIONS = [
  {
    name: 'tiny-1',
    cpu: 0.2,
    memory: '256Mi'
  },
  {
    name: 'micro-1',
    cpu: 2,
    memory: '1Gi'
  },
  {
    name: 'small-1',
    cpu: 2,
    memory: '2Gi'
  },
  {
    name: 'medium-1',
    cpu: 4,
    memory: '4Gi'
  },
  {
    name: 'large-1',
    cpu: 8,
    memory: '8Gi'
  }
] as const
export const BOOKIE_OPTIONS = [
  {
    name: 'tiny-1',
    cpu: 0.2,
    memory: '256Mi',
    disk: '8Gi'
  },
  {
    name: 'micro-1',
    cpu: 1,
    memory: '1Gi',
    disk: '32Gi'
  },
  {
    name: 'small-1',
    cpu: 1,
    memory: '2Gi',
    disk: '64Gi'
  },
  {
    name: 'medium-1',
    cpu: 2,
    memory: '4Gi',
    disk: '128Gi'
  },
  {
    name: 'large-1',
    cpu: 4,
    memory: '8Gi',
    disk: '256Gi'
  }
] as const
export const FLINK_OPTIONS = [
  {
    name: 'micro-1',
    cpu: 1,
    memory: '2Gi'
  },
  {
    name: 'small-1',
    cpu: 2,
    memory: '4Gi'
  }
] as const
const CN_POOLMEMBER_DNS = '.cn'
export interface Cluster {
  name: string
  uid: string
  webServiceUrl?: string // 'https://${dnsName}'
  brokerServiceUrl?: string // 'pulsar+ssl://${dnsName}:6651'
  websocketServiceUrl?: string // 'wss://${dnsName}:9443'
  kopServiceUrl?: string
  conditions: ClusterConditions
  config?: ClusterConfig
  deleted: boolean
  organization: string
  instance: string
  location?: Location
  brokerVersion?: string // ie. '2.8.0.9'
  bookKeeperVersion?: string // ie. '2.8.0.9'
  isKopEnabled?: boolean
  // eslint-disable-next-line prettier/prettier
  bookieNodeType: (typeof BOOKIE_OPTIONS)[number] | undefined
  // eslint-disable-next-line prettier/prettier
  brokerNodeType: (typeof BROKER_OPTIONS)[number] | undefined
  brokerPods: number | undefined
  bookiePods: number | undefined
  bookKeeperSetRef: V1alpha1BookKeeperSetReference | undefined
  brokerResources: V1alpha1DefaultNodeResource | undefined
  bookieResources: V1alpha1BookkeeperNodeResource | undefined
  poolMemberRef?: CloudV1alpha1PoolMemberReference
}

export interface ClusterConfig {
  websocketEnabled?: boolean
  // todo: these are never used anywhere?
  transactionEnabled?: boolean
  functionEnabled?: boolean
  auditLog?: V1alpha1AuditLog
  auditLogEnabled?: boolean
  auditLogDescribing?: boolean
  auditLogProducing?: boolean
  auditLogConsuming?: boolean
  custom?: {
    backlogQuotaDefaultLimitBytes?: string | number
    backlogQuotaDefaultRetentionPolicy?: string
    maxProducersPerTopic?: string | number
    maxConsumersPerTopic?: string | number
    maxConsumersPerSubscription?: string | number
    dispatchThrottlingRatePerTopicInByte?: string | number
    dispatchThrottlingRatePerTopicInMsg?: string | number
    dispatchThrottlingRatePerSubscriptionInByte?: string | number
    dispatchThrottlingRatePerSubscriptionInMsg?: string | number
    subscribeThrottlingRatePerConsumer?: string | number
    topicLevelPoliciesEnabled?: string | boolean
    snAuditLogConfig?: string
  }
  protocols?: {
    // Object is the correct type, we serialize this object
    // to a struct on the cloud-api-server end.
    // eslint-disable-next-line @typescript-eslint/ban-types
    kafka?: {}
  }
}

export interface ClusterPayload {
  organization: string
  instance: string
  name: string
  location: Location
  // eslint-disable-next-line prettier/prettier
  bookieNodeType: (typeof BOOKIE_OPTIONS)[number]['name']
  // eslint-disable-next-line prettier/prettier
  brokerNodeType: (typeof BROKER_OPTIONS)[number]['name']
  brokerPods: number
  bookiePods: number
  config?: ClusterConfig
}

export interface ClusterResourcePayload {
  organization: string
  instance: string
  name: string
  location: Location
  brokerCpu: string
  brokerMemory: string
  brokerPods: number
  bookieCpu?: string
  bookieMemory?: string
  bookiePods?: number
  config?: ClusterConfig
}

export interface FlinkCluster {
  name: string
  webServiceUrl?: string // 'https://${dnsName}'
  conditions: FlinkClusterConditions
  deleted: boolean
  location?: Location
  organization: string
  instance: string
}

export type Location = string

export interface ClusterConditions {
  Ready: boolean
  ServiceEndpointReady: boolean
  ZookeeperReady: boolean
  BookKeeperReady: boolean
  PulsarBrokerReady: boolean
  PulsarProxyReady: boolean
}

export interface FlinkClusterConditions {
  Ready: boolean
  SqlGatewayReady: boolean
  FlinkClusterReady: boolean
  SqlGatewayDNSEndpointReady: boolean
}

/* State */
const bookieOptions = BOOKIE_OPTIONS
const brokerOptions = BROKER_OPTIONS
const flinkNodeOptions = FLINK_OPTIONS
const clusterMap: Ref<Record<string, Array<Cluster>>> = ref({})
const flinkClusterMap: Ref<Record<string, Array<FlinkCluster>>> = ref({})

const currentClusterResource: ComputedRef<{
  cu: number | undefined
  su: number | undefined
  cuThroughput: number | undefined
  suThroughput: number | undefined
}> = computed(() => {
  const instanceName = usePulsarState().instance.value
  const clusterUid = usePulsarState().clusterUid.value
  const cluster = (clusterMap.value[instanceName || ''] || []).find(clu => {
    return clu.uid === clusterUid
  })
  if (!instanceName || !clusterUid || !cluster) {
    return { cu: undefined, su: undefined, cuThroughput: undefined, suThroughput: undefined }
  }

  return getClusterResource(cluster)
})

const getClusterResource = (cluster: Cluster) => {
  const cu = getCu(cluster)
  const su = getSu(cluster)
  const cuThroughput = cu
    ? Math.round(((cu * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  const suThroughput = su
    ? Math.round(((su * throughputPerSu) / bkReplFactor) * 10) / 10
    : undefined
  return { cu, su, cuThroughput, suThroughput }
}

const getCu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.brokerResources) {
    const cpu = cpuParser(cluster.brokerResources.cpu)
    const memory = memoryParser(cluster.brokerResources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const brokerOption = brokerOptions.find(
    brokerOption => brokerOption.name === cluster.brokerNodeType?.name
  )
  if (brokerOption) {
    const cpu = brokerOption.cpu
    const memory = memoryParser(brokerOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const getSu = (cluster: Cluster): number | undefined => {
  // formula: https://streamnative.slab.com/posts/pricing-and-pricing-strategy-ff8ryce6
  if (cluster.bookKeeperSetRef) {
    // using shared bookie, probably free cluster, returning 0
    return 0
  }
  if (cluster.bookieResources) {
    const cpu = cpuParser(cluster.bookieResources.cpu)
    const memory = memoryParser(cluster.bookieResources.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }
  const bookieOption = bookieOptions.find(
    bookieOption => bookieOption.name === cluster.bookieNodeType?.name
  )
  if (bookieOption) {
    const cpu = bookieOption.cpu
    const memory = memoryParser(bookieOption.memory)
    const cu = Math.max(cpu / cpuPerUnitRaw, memory / memoryPerUnit)
    return Math.round(cu * 100) / 100
  }

  // this most likely means error
  return undefined
}

const mustGetSelectedInstance = (): string => {
  return usePulsarState().mustInstance()
}

const clusters: ComputedRef<Array<Cluster>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value) {
    return []
  }
  return (clusterMap.value[instance.value] ?? []).sort((a, b) => {
    if (a.name < b.name) {
      return -1
    }
    if (a.name > b.name) {
      return 1
    }
    return 0
  })
})

const flinkClusters: ComputedRef<Array<FlinkCluster>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value) {
    return []
  }
  return (flinkClusterMap.value[instance.value] ?? []).sort((a, b) => {
    if (a.name < b.name) {
      return -1
    }
    if (a.name > b.name) {
      return 1
    }
    return 0
  })
})

/* Getters */
const clusterNames: ComputedRef<Array<string>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value) {
    return []
  }

  if (!clusterMap.value[instance.value]) {
    return []
  }

  return clusterMap.value[instance.value].map(clu => clu.name).sort()
})
const flinkClusterNames: ComputedRef<Array<string>> = computed(() => {
  const { instance } = usePulsarState()
  if (!instance.value) {
    return []
  }
  if (!flinkClusterMap.value[instance.value]) {
    return []
  }
  return flinkClusterMap.value[instance.value].map(clu => clu.name).sort()
})
const activeCluster: ComputedRef<Cluster | undefined> = computed(() => {
  const { instance, clusterUid } = usePulsarState()
  if (!instance.value) {
    return undefined
  }

  if (!clusterMap.value[instance.value]) {
    return undefined
  }

  const targetCluster = clusterMap.value[instance.value].find(clu => {
    return clu.uid === clusterUid.value
  })

  if (targetCluster) {
    return targetCluster
  }

  return clusterMap.value[instance.value][0]
})
const isActiveClusterReady: ComputedRef<boolean> = computed(() => {
  return activeCluster.value ? activeCluster.value.conditions.Ready : false
})

const functionEnabled: ComputedRef<boolean> = computed(() => {
  if (
    activeCluster.value &&
    activeCluster.value.config &&
    activeCluster.value.config.functionEnabled
  ) {
    return true
  }
  return false
})

/* Mutations */
const setCluster = (cluster: Cluster, instanceName: string = mustGetSelectedInstance()) => {
  const clusterIndex = clusterMap.value[instanceName]
    ? clusterMap.value[instanceName].findIndex(clusterItem => clusterItem.name === cluster.name)
    : -1
  const clusterMapCopy = clusterMap.value

  if (clusterIndex > -1) {
    clusterMapCopy[instanceName].splice(clusterIndex, 1, cluster)
  } else {
    clusterMapCopy[instanceName] = [cluster]
  }
  clusterMap.value = clusterMapCopy
}

const removeFlinkCluster = (name: string) => {
  const clusterIndex = flinkClusterMap.value[mustGetSelectedInstance()].findIndex(
    clusterItem => clusterItem.name === name
  )
  const flinkClusterMapCopy = flinkClusterMap.value
  flinkClusterMapCopy[mustGetSelectedInstance()].splice(clusterIndex, 1)
  flinkClusterMap.value = flinkClusterMapCopy
}

const setFlinkCluster = (flinkCluster: FlinkCluster) => {
  const clusterIndex = flinkClusterMap.value[mustGetSelectedInstance()].findIndex(
    clusterItem => clusterItem.name === flinkCluster.name
  )
  const flinkClusterMapCopy = flinkClusterMap.value
  flinkClusterMapCopy[mustGetSelectedInstance()].splice(clusterIndex, 1, flinkCluster)
  flinkClusterMap.value = flinkClusterMapCopy
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const setActiveClusterDeleted = (_cluster: any) => {
  const cluster = clusters.value.find(clusterItem => clusterItem.name === _cluster.name)
  if (cluster) {
    cluster.deleted = true
  }
}

const createCluster = async (payload: ClusterPayload): Promise<Cluster> => {
  try {
    const body = await validateAndGetClusterPayload(payload, true)
    const newCluster = formatClusterModel(
      (await useCloudApi().createNamespacedPulsarCluster(payload.organization, body)).data
    )
    setCluster(newCluster, payload.instance)
    return newCluster
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore: Object is of type 'unknown'
    if (e && e.response && e.response.status === 409) {
      throw Error(t('cluster.duplicateClusterError'))
    }
    throw Error(getErrorMessage(e))
  }
}

const createClusterByResources = async (payload: ClusterResourcePayload): Promise<Cluster> => {
  try {
    const body = await validateAndGetClusterPayloadResource(payload, true)
    const newCluster = formatClusterModel(
      (await useCloudApi().createNamespacedPulsarCluster(payload.organization, body)).data
    )
    setCluster(newCluster, payload.instance)
    return newCluster
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore: Object is of type 'unknown'
    if (e && e.response && e.response.status === 409) {
      throw Error(t('cluster.duplicateClusterError'))
    }
    throw Error(getErrorMessage(e))
  }
}

const updateClusterConfig = async (payload: ClusterPayload): Promise<Cluster> => {
  // get rid of keys with 'undefined' values
  const numberToStringConfig = JSON.parse(JSON.stringify(payload.config))
  // 'custom' attrs numbers must be strings in request
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  const patchArray: Record<string, any> = []
  const clusterObj = clusterMap.value[payload.instance].find(c => c.name === payload.name)
  if (!clusterObj?.config) {
    patchArray.push({
      op: 'add',
      path: `/spec/config`,
      value: {}
    })
  }

  if (!clusterObj?.config?.custom) {
    patchArray.push({
      op: 'add',
      path: `/spec/config/custom`,
      value: {}
    })
  }

  Object.entries(numberToStringConfig.custom).map(([key, value]) => {
    if (value) {
      patchArray.push({
        op: 'replace',
        path: `/spec/config/custom/${key}`,
        value: String(value)
      })
    }
  })

  if (Object.keys(numberToStringConfig).includes('websocketEnabled')) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/websocketEnabled`,
      value: numberToStringConfig.websocketEnabled
    })
  }
  if (numberToStringConfig.protocols) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/protocols`,
      value: numberToStringConfig.protocols
    })
  }
  if (numberToStringConfig.auditLog) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/auditLog`,
      value: numberToStringConfig.auditLog
    })
  } else {
    if (clusterObj?.config?.auditLog !== undefined) {
      patchArray.push({
        op: 'remove',
        path: `/spec/config/auditLog`
      })
    }
  }

  patchArray.push({
    op: 'replace',
    path: '/spec/broker/replicas',
    value: payload.brokerPods
  })
  patchArray.push({
    op: 'replace',
    path: '/spec/bookkeeper/replicas',
    value: payload.bookiePods
  })
  patchArray.push({
    op: 'replace',
    path: '/spec/broker/resourceSpec',
    value: { nodeType: payload.brokerNodeType }
  })
  patchArray.push({
    op: 'replace',
    path: '/spec/bookkeeper/resourceSpec',
    value: { nodeType: payload.bookieNodeType }
  })
  try {
    const { data } = await useCloudApi().patchNamespacedPulsarCluster(
      payload.name,
      payload.organization,
      patchArray
    )
    const clusterModel = formatClusterModel(data)
    // for edit, sometimes k8 api returns "ready" as reconciler hasn't kicked in yet to update the
    // ready flag. So manually setting ready to false in anticipation.
    clusterModel.conditions.Ready = false
    setCluster(clusterModel)
    return clusterModel
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const updateClusterConfigByResource = async (
  payload: ClusterPayload | ClusterResourcePayload
): Promise<Cluster> => {
  // 'custom' attrs numbers must be strings in request
  // eslint-disable-next-line @typescript-eslint/no-explicit-any
  const patchArray: Record<string, any> = []
  const clusterObj = clusterMap.value[payload.instance].find(c => c.name === payload.name)
  if (!clusterObj?.config) {
    patchArray.push({
      op: 'add',
      path: `/spec/config`,
      value: {}
    })
  }

  if (!clusterObj?.config?.custom) {
    patchArray.push({
      op: 'add',
      path: `/spec/config/custom`,
      value: {}
    })
  }

  const cluster = (
    await useCloudApi().readNamespacedPulsarCluster(payload.name, payload.organization)
  ).data

  Object.entries(payload.config?.custom || {}).map(([key, value]) => {
    if (value === undefined) {
      if ((cluster.spec?.config?.custom ?? {})[key]) {
        patchArray.push({
          op: 'remove',
          path: `/spec/config/custom/${key}`
        })
      }
    } else {
      patchArray.push({
        op: 'replace',
        path: `/spec/config/custom/${key}`,
        value: String(value)
      })
    }
  })

  if (payload.config?.websocketEnabled !== undefined) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/websocketEnabled`,
      value: payload.config?.websocketEnabled
    })
  }
  if (payload.config?.protocols !== undefined) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/protocols`,
      value: payload.config?.protocols.kafka ? { kafka: {} } : {}
    })
  }
  if (payload.config?.auditLog !== undefined) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/auditLog`,
      value: payload.config?.auditLog
    })
  } else {
    if (cluster.spec?.config?.auditLog !== undefined) {
      patchArray.push({
        op: 'remove',
        path: `/spec/config/auditLog`
      })
    }
  }
  if (payload.config?.transactionEnabled !== undefined) {
    patchArray.push({
      op: 'replace',
      path: `/spec/config/transactionEnabled`,
      value: payload.config?.transactionEnabled
    })
  }

  patchArray.push({
    op: 'replace',
    path: '/spec/broker/replicas',
    value: payload.brokerPods
  })
  patchArray.push({
    op: 'replace',
    path: '/spec/bookkeeper/replicas',
    value: payload.bookiePods
  })

  if ('brokerCpu' in payload) {
    if (cluster.spec?.broker?.resources) {
      patchArray.push({
        op: 'replace',
        path: '/spec/broker/resources/cpu',
        value: payload.brokerCpu
      })
      patchArray.push({
        op: 'replace',
        path: '/spec/broker/resources/memory',
        value: payload.brokerMemory
      })
    } else {
      patchArray.push({
        op: 'add',
        path: '/spec/broker/resources',
        value: { cpu: payload.brokerCpu, memory: payload.brokerMemory }
      })
    }

    if (cluster.spec?.bookkeeper?.resources) {
      patchArray.push({
        op: 'replace',
        path: '/spec/bookkeeper/resources/cpu',
        value: payload.bookieCpu
      })
      patchArray.push({
        op: 'replace',
        path: '/spec/bookkeeper/resources/memory',
        value: payload.bookieMemory
      })
    } else {
      patchArray.push({
        op: 'add',
        path: '/spec/bookkeeper/resources',
        value: { cpu: payload.bookieCpu, memory: payload.bookieMemory }
      })
    }
  } else {
    patchArray.push({
      op: cluster.spec?.bookkeeper?.resourceSpec ? 'replace' : 'add',
      path: '/spec/broker/resourceSpec',
      value: { nodeType: payload.brokerNodeType }
    })
    patchArray.push({
      op: cluster.spec?.bookkeeper?.resourceSpec ? 'replace' : 'add',
      path: '/spec/bookkeeper/resourceSpec',
      value: { nodeType: payload.bookieNodeType }
    })
  }
  try {
    const { data } = await useCloudApi().patchNamespacedPulsarCluster(
      payload.name,
      payload.organization,
      patchArray
    )
    const clusterModel = formatClusterModel(data)
    setCluster(clusterModel)
    return clusterModel
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const deleteFlinkCluster = async (flinkCluster: FlinkCluster) => {
  try {
    await useFlinkApi().deleteNamespacedFlinkCluster(flinkCluster.name, flinkCluster.organization)

    removeFlinkCluster(flinkCluster.name)
  } catch (e) {
    throw Error(getErrorMessage(e, t('cluster.deleteClusterFailedNotification')))
  }
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
const createFlinkCluster = async (payload: any) => {
  try {
    if (payload.name.length > 15) {
      throw t('flink.cluster.nameTooLarge')
    }

    const { data } = await useFlinkApi().createNamespacedFlinkCluster(payload.organization, {
      kind: 'FlinkCluster',
      metadata: {
        name: payload.name,
        namespace: payload.organization
      },
      spec: {
        instanceName: payload.instance,
        location: payload.location,
        replicas: payload.replicas,
        resource: {
          nodeType: payload.nodeType
        }
      },
      status: {}
    })

    const flinkCluster = formatFlinkClusterModel(data)
    flinkClusterMap.value[mustGetSelectedInstance()].push(flinkCluster)
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const getFlinkCluster = async ({
  flinkCluster
}: {
  flinkCluster: { name: string; organization: string }
}) => {
  try {
    const { data } = await useFlinkApi().readNamespacedFlinkCluster(
      flinkCluster.name,
      flinkCluster.organization
    )

    setFlinkCluster(formatFlinkClusterModel(data))
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const getFlinkClusterMap = async ({ organization }: { organization: string }) => {
  try {
    const { data } = await useFlinkApi().listNamespacedFlinkCluster(organization)

    const cm: Record<string, Array<FlinkCluster>> = {}
    data.items.forEach(ins => {
      if (!ins.spec?.instanceName) {
        // Not a valid instance name
        console.warn(`invalid instance name is passed in: ${ins}`)
        return
      }
      const instanceName = ins.spec?.instanceName

      if (!cm[instanceName]) {
        cm[instanceName] = []
      }
      cm[instanceName].push(formatFlinkClusterModel(ins))
    })

    flinkClusterMap.value = cm
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const deleteCluster = async (cluster: Cluster) => {
  // can't delete cluster if has flink clusters
  getFlinkClusterMap(cluster)
  if (flinkClusters.value.filter(flink => !flink.deleted).length > 0) {
    throw Error(t('cluster.deleteClusterFlinkError'))
  }

  await useCloudApi().deleteNamespacedPulsarCluster(cluster.name, cluster.organization)
  setActiveClusterDeleted(cluster)
}

const getClusterMap = async ({ organization }: { organization: string }) => {
  const { data } = await useCloudApi().listNamespacedPulsarCluster(organization)
  useAnalytics().identifyUser()

  const cm: Record<string, Array<Cluster>> = {}
  data.items.forEach(ins => {
    if (!ins.spec?.instanceName) {
      // invalid instance name is passed in
      console.warn(`invalid instance name is passed in: ${ins}`)
      return
    }
    const instanceName = ins.spec?.instanceName

    if (!cm[instanceName]) {
      cm[instanceName] = []
    }
    cm[instanceName].push(formatClusterModel(ins))
  })
  clusterMap.value = cm
}

/**
 * Validates and converts UI's custom pulsar cluster object to V1alpha1PulsarCluster
 * @param payload Custom object that represents a pulsar cluster in UI
 * @param isCreate if is for create or update cluster
 * @returns V1alpha1PulsarCluster our API accepts and we should be using
 */
const validateAndGetClusterPayload = async (
  payload: ClusterPayload,
  isCreate: boolean
): Promise<V1alpha1PulsarCluster> => {
  if (!payload.name) {
    throw Error(t('cluster.clusterNameIsRequired'))
  }

  if (!payload.location) {
    throw Error(t('cluster.clusterLocationIsRequired'))
  }

  const { data } = await useCloudApi().readNamespacedPulsarInstance(
    payload.instance,
    payload.organization
  )
  const isFree = data.spec?.type === 'free'

  if (isFree && (payload.brokerPods > 1 || payload.brokerNodeType !== 'tiny-1')) {
    throw Error(t('cluster.unableToCreateFreeBroker'))
  }

  const configValue = isFree
    ? undefined
    : JSON.parse(JSON.stringify(payload.config, (k, v) => (typeof v !== 'number' ? v : '' + v)))

  Object.entries(configValue?.custom ?? {}).forEach(([key, value]) => {
    if (!value) {
      delete configValue.custom[key]
    }
  })

  if (isCreate) {
    // For create, return minimal clust object needed to create
    return {
      kind: 'PulsarCluster',
      metadata: {
        name: payload.name,
        namespace: payload.organization
      },
      spec: {
        instanceName: payload.instance,
        location: payload.location,
        config: configValue,
        broker: {
          replicas: payload.brokerPods,
          resourceSpec: { nodeType: payload.brokerNodeType }
        },
        bookkeeper: isFree
          ? undefined
          : {
              replicas: payload.bookiePods,
              resourceSpec: { nodeType: payload.bookieNodeType }
            }
      }
    }
  }

  // For updates, there are some immutable values, such as poolRef, and we should maintain
  // those values.  Thus we are fetching the base model and modifying what we can on top.
  const cluster = (
    await useCloudApi().readNamespacedPulsarCluster(payload.name, payload.organization)
  ).data

  if (!cluster.spec) {
    throw Error(t('cluster.specIsMissing'))
  }

  cluster.spec.config = configValue
  if (isFree) {
    // free
    cluster.spec.bookkeeper = undefined
  } else {
    // not free
    cluster.spec.bookkeeper = Object.assign(cluster.spec.bookkeeper || {}, {
      replicas: payload.bookiePods,
      resourceSpec: Object.assign(cluster.spec?.bookkeeper?.resourceSpec || {}, {
        nodeType: payload.bookieNodeType
      })
    })
  }
  cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
    replicas: payload.brokerPods,
    resourceSpec: Object.assign(cluster.spec?.broker?.resourceSpec || {}, {
      nodeType: payload.brokerNodeType
    })
  })
  return cluster
}

/**
 * Validates and converts UI's custom pulsar cluster object to V1alpha1PulsarCluster
 * @param payload Custom object that represents a pulsar cluster in UI
 * @param isCreate if is for create or update cluster
 * @returns V1alpha1PulsarCluster our API accepts and we should be using
 */
const validateAndGetClusterPayloadResource = async (
  payload: ClusterResourcePayload,
  isCreate: boolean
): Promise<V1alpha1PulsarCluster> => {
  if (!payload.name) {
    throw Error(t('cluster.clusterNameIsRequired'))
  }

  if (!payload.location) {
    throw Error(t('cluster.clusterLocationIsRequired'))
  }

  const { data } = await useCloudApi().readNamespacedPulsarInstance(
    payload.instance,
    payload.organization
  )
  const isFree = data.spec?.type === 'free'

  // TODO need to scope this out better
  if (isFree && (Number(payload.brokerCpu) > 0.2 || payload.bookieCpu)) {
    throw Error(t('cluster.unableToCreateFreeBrokerResource')) // TODO refine this error message
  }

  const configValue = isFree
    ? undefined
    : JSON.parse(JSON.stringify(payload.config, (k, v) => (typeof v !== 'number' ? v : '' + v)))

  Object.entries(configValue?.custom ?? {}).forEach(([key, value]) => {
    if (!value) {
      delete configValue.custom[key]
    }
  })

  if (isCreate) {
    if (
      !isFree &&
      (payload.bookieCpu === undefined ||
        payload.bookieMemory === undefined ||
        payload.bookiePods === undefined)
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // For create, return minimal cluster object needed to create
    return {
      kind: 'PulsarCluster',
      metadata: {
        name: payload.name,
        namespace: payload.organization
      },
      spec: {
        instanceName: payload.instance,
        location: payload.location,
        config: configValue,
        broker: {
          replicas: payload.brokerPods,
          resources: {
            cpu: payload.brokerCpu,
            memory: payload.brokerMemory
          }
        },
        bookkeeper:
          isFree ||
          payload.bookieCpu === undefined ||
          payload.bookieMemory === undefined ||
          payload.bookiePods === undefined
            ? undefined
            : {
                replicas: payload.bookiePods,
                resources: {
                  cpu: payload.bookieCpu,
                  memory: payload.bookieMemory
                }
              }
      }
    }
  }

  // For updates, there are some immutable values, such as poolRef, and we should maintain
  // those values.  Thus we are fetching the base model and modifying what we can on top.
  const cluster = (
    await useCloudApi().readNamespacedPulsarCluster(payload.name, payload.organization)
  ).data

  if (!cluster.spec) {
    throw Error(t('cluster.specIsMissing'))
  }

  cluster.spec.config = configValue
  if (isFree) {
    // free
    cluster.spec.bookkeeper = undefined
  } else {
    if (
      payload.bookieCpu === undefined ||
      payload.bookieMemory === undefined ||
      payload.bookiePods === undefined
    ) {
      throw Error(t('cluster.bookieResourcesRequired')) // TODO refine this error message
    }

    // not free
    cluster.spec.bookkeeper = Object.assign(cluster.spec.bookkeeper || {}, {
      replicas: payload.bookiePods,
      resources: Object.assign(cluster.spec?.bookkeeper?.resources || {}, {
        cpu: payload.bookieCpu,
        memory: payload.bookieMemory
      })
    })
  }
  cluster.spec.broker = Object.assign(cluster.spec.broker || {}, {
    replicas: payload.brokerPods,
    resources: Object.assign(cluster.spec?.broker?.resources || {}, {
      cpu: payload.brokerCpu,
      memory: payload.brokerMemory
    })
  })
  return cluster
}

export const formatClusterModel = (cluster: V1alpha1PulsarCluster): Cluster => {
  if (!cluster.metadata?.uid) {
    throw new Error('Cluster is missing UID')
  }

  const conditions = (cluster.status?.conditions?.reduce((curr, condition) => {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    curr[condition.type] = condition.status === 'True'
    return curr
  }, {}) ?? {
    Ready: false,
    ServiceEndpointReady: false,
    ZookeeperReady: false,
    BookKeeperReady: false,
    PulsarBrokerReady: false,
    PulsarProxyReady: false
  }) as ClusterConditions

  const serviceUrl = cluster.spec?.serviceEndpoints?.[0].dnsName
  let dnsName = cluster.spec?.serviceEndpoints?.[0].dnsName
  dnsName = cluster.spec?.serviceEndpoints?.[0].dnsName
    .replace('http://', '')
    .replace('https://', '')
    .replace(/:\d+/, '')
  const serviceType = cluster.spec?.serviceEndpoints?.[0].type

  const port = useInstance().istioEnabled.value ? 443 : 9443
  return {
    name: cluster.metadata?.name as string,
    uid: cluster.metadata?.uid,
    config: cluster.spec?.config as ClusterConfig,
    conditions,
    webServiceUrl: serviceUrl,
    brokerServiceUrl:
      serviceType === 'serviceTls' ? `pulsar+ssl://${dnsName}:6651` : `pulsar://${dnsName}:6651`,
    websocketServiceUrl:
      cluster.spec?.config?.websocketEnabled && dnsName ? `wss://${dnsName}:${port}` : undefined,
    kopServiceUrl:
      !!cluster.spec?.config?.protocols?.kafka && dnsName ? `${dnsName}:9093` : undefined,
    deleted: !!cluster.metadata?.deletionTimestamp,
    location: cluster.spec?.location,
    organization: cluster.metadata?.namespace as string,
    instance: cluster.spec?.instanceName as string,
    brokerVersion: cluster.spec?.broker?.image?.split(':')[1],
    bookKeeperVersion: cluster.spec?.bookkeeper?.image?.split(':')[1],
    isKopEnabled: !!cluster.spec?.config?.protocols?.kafka,
    bookKeeperSetRef: cluster.spec?.bookKeeperSetRef,
    bookieNodeType: BOOKIE_OPTIONS.find(
      bo => bo.name === cluster.spec?.bookkeeper?.resourceSpec?.nodeType
    ),
    bookiePods: cluster.spec?.bookkeeper?.replicas,
    brokerNodeType: BROKER_OPTIONS.find(
      bo => bo.name === cluster.spec?.broker?.resourceSpec?.nodeType
    ),
    brokerPods: cluster.spec?.broker.replicas,
    brokerResources: cluster.spec?.broker.resources,
    bookieResources: cluster.spec?.bookkeeper?.resources,
    poolMemberRef: cluster.spec?.poolMemberRef
  }
}
export const formatFlinkClusterModel = (cluster: V1alpha1FlinkCluster): FlinkCluster => {
  const conditions = (cluster.status?.conditions?.reduce((curr, condition) => {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    curr[condition.type] = condition.status === 'True'
    return curr
  }, {}) ?? {
    Ready: false,
    SqlGatewayReady: false,
    FlinkClusterReady: false,
    SqlGatewayDNSEndpointReady: false
  }) as FlinkClusterConditions

  let webServiceUrl = ''
  if (cluster?.status?.gatewayConnectionString) {
    if (cluster?.status?.gatewayConnectionString.startsWith('https://')) {
      webServiceUrl = `${cluster?.status?.gatewayConnectionString}`
    } else {
      webServiceUrl = `https://${cluster?.status?.gatewayConnectionString}`
    }

    if (webServiceUrl.endsWith('/')) {
      webServiceUrl = webServiceUrl.slice(0, -1)
    }
  }

  return {
    name: cluster.metadata?.name as string,
    conditions,
    webServiceUrl: webServiceUrl,
    deleted: !!cluster.metadata?.deletionTimestamp,
    location: cluster.spec?.location,
    organization: cluster.metadata?.namespace as string,
    instance: cluster.spec?.instanceName as string
  }
}

// locally scoped composable for pinging a cluster
export const usePingCluster = () => {
  const cluster = ref<V1alpha1PulsarCluster>()
  const conditions = computed(() => cluster.value?.status?.conditions ?? [])
  const error = ref('')

  const { resume, pause, isActive } = useIntervalFn(async () => {
    if (!activeCluster.value) {
      // cluster fetch may not be ready, simply retry at next interval
      return
    }
    const clusterName = activeCluster.value.name
    const organization = activeCluster.value.organization
    const clusterUid = activeCluster.value.uid

    if (!lock.isBusy(clusterUid)) {
      await lock.acquire(clusterUid, async () => {
        try {
          const { data } = await useCloudApi().readNamespacedPulsarCluster(
            clusterName,
            organization
          )
          cluster.value = data
          const formatedCluster = formatClusterModel(data)
          setCluster(formatedCluster, formatedCluster.instance)
        } catch (e) {
          error.value = getErrorMessage(e)
          pause()
        }
      })
    }
  }, 5000)

  return {
    cluster,
    conditions,
    resume,
    pause,
    isActive,
    error
  }
}

export const useResourceCalculationWithThroughput = (throughput: Ref<number>) => {
  const getRawResourceUnit = () => {
    return (throughput.value * bkReplFactor) / throughputPerSu
  }
  const brokerNodeCount = computed<number>(() => {
    return Math.max(Math.ceil(getRawResourceUnit() / maxBrokerCu), brokerMinimumNodeCount)
  })
  const brokerCUCount = computed<number>(() => {
    return Math.max(
      Math.round((getRawResourceUnit() / brokerNodeCount.value) * 10) / 10,
      minBrokerCu
    )
  })
  const bookieNodeCount = computed<number>(() => {
    return Math.max(Math.ceil(getRawResourceUnit() / maxBookieSu), bookieMinimumNodeCount)
  })
  const bookieSUCount = computed<number>(() => {
    return Math.max(
      Math.round((getRawResourceUnit() / bookieNodeCount.value) * 10) / 10,
      minBookieSu
    )
  })

  return {
    ...useResourceCalculation({
      brokerNodeCount,
      brokerCUCount,
      bookieNodeCount,
      bookieSUCount
    }),
    brokerNodeCount,
    brokerCUCount,
    bookieNodeCount,
    bookieSUCount
  }
}

export const useResourceCalculation = (
  params: {
    brokerNodeCount: Ref<number | undefined>
    brokerCUCount: Ref<number | undefined>
    bookieNodeCount: Ref<number | undefined>
    bookieSUCount: Ref<number | undefined>
  } = {
    brokerNodeCount: computed(() => activeCluster.value?.brokerPods),
    brokerCUCount: computed(() => currentClusterResource.value.cu),
    bookieNodeCount: computed(() => activeCluster.value?.bookiePods),
    bookieSUCount: computed(() => currentClusterResource.value.su)
  }
) => {
  const { brokerNodeCount, brokerCUCount, bookieNodeCount, bookieSUCount } = params
  const cuTotal = computed<number>(() => {
    if (!brokerNodeCount.value || !brokerCUCount.value) {
      return NaN
    }
    return Math.round(brokerNodeCount.value * brokerCUCount.value * 10) / 10
  })
  const suTotal = computed<number>(() => {
    if (!bookieNodeCount.value || !bookieSUCount.value) {
      return NaN
    }
    return Math.round(bookieNodeCount.value * bookieSUCount.value * 10) / 10
  })
  const brokerThroughput = computed<number>(() => {
    return Math.round(((cuTotal.value * throughputPerSu) / bkReplFactor) * 10) / 10
  })
  const totalCPUCores = computed<number>(() => {
    return (cuTotal.value * cpuPerUnit) / 1000
  })
  const bookieThroughput = computed<number>(() => {
    return Math.round(((suTotal.value * throughputPerSu) / bkReplFactor) * 10) / 10
  })
  const bookieMemory = computed<number>(() => {
    return suTotal.value * memoryGbPerSu
  })
  const totalThroughput = computed<number>(() => {
    if (!bookieThroughput.value) {
      // for free bookie will not be set and throughput will be NaN
      return brokerThroughput.value
    }
    return Math.min(brokerThroughput.value, bookieThroughput.value)
  })

  return {
    cuTotal,
    suTotal,
    brokerThroughput,
    totalCPUCores,
    bookieThroughput,
    bookieMemory,
    totalThroughput
  }
}

export const isCNPoolMember = (dnsName: string | undefined) => {
  if (dnsName === undefined) {
    return true
  }
  return dnsName.endsWith(CN_POOLMEMBER_DNS)
}

export const init = (initialState: PulsarState) => {
  const { organization, clusterUid } = usePulsarState()
  const valueChanged = async ([org, clusUid]: [string | undefined, string | undefined]) => {
    if (!org) {
      clusterMap.value = {}
      flinkClusterMap.value = {}
      lastOrg = undefined
      return
    }

    const isActiveClusterMissing = clusUid ? !clusters.value.find(clu => clu.uid === clusUid) : true

    if (lastOrg !== org || isActiveClusterMissing) {
      await getClusterMap({ organization: org })
    }

    lastOrg = org
  }

  watch([organization, clusterUid], valueChanged)
  return valueChanged([initialState.organization, initialState.clusterUid])
}

export const useCluster = () => {
  return {
    clusters,
    flinkClusters,
    bookieOptions,
    brokerOptions,
    flinkNodeOptions,
    clusterMap,
    clusterNames,
    flinkClusterNames,
    activeCluster,
    isActiveClusterReady,
    functionEnabled,
    currentClusterResource,
    maxBrokerCu,
    maxBookieSu,
    throughputPerSu,
    bkReplFactor,
    memoryGbPerSu,
    memoryPerUnit,
    cpuPerUnit,
    minBookieSu,
    minBrokerCu,
    minThroughput,
    bookieMinimumNodeCount,
    brokerMinimumNodeCount,
    bookieMaximumNodeCount,
    brokerMaximumNodeCount,
    minResourceUnit,
    minFreeCU,
    freeResources,
    resourceOptions,
    clusterMetrics,
    init,
    createCluster,
    updateClusterConfig,
    updateClusterConfigByResource,
    deleteFlinkCluster,
    createFlinkCluster,
    getFlinkCluster,
    getFlinkClusterMap,
    deleteCluster,
    getClusterMap,
    createClusterByResources,
    isCNPoolMember
  }
}
