import { peekNthMessage } from '@/api/topics'
import {
  getTopics as apiGetTopics,
  createTopic as apiCreateTopic,
  updateTopic as apiUpdateTopic,
  deleteTopic as apiDeleteTopic,
  fetchTopicStats,
  createSubscription as apiCreateSubscription,
  deleteSubscription as apiDeleteSubscription,
  resetCursor,
  skipAllMessages,
  getTopicBundle,
  getTopicBroker as apiGetTopicBroker,
  fetchAllSchemas,
  deleteSchema,
  postSchema,
  getInternalStats,
  unloadTopic as apiUnloadTopic,
  compactionStatus as apiCompactionStatus,
  compact,
  getPermissionsOnTopic,
  grantPermissionsOnTopic,
  revokePermissionsOnTopic,
  getPartitionedMetadata
} from '@/api/topics'
import { getTopicsMetrics } from '@/api/metrics'
import { i18n } from '@/lang'
import {
  LongRunningProcessStatusStatusEnum,
  type GetSchemaResponse,
  type GetSchemaResponseTypeEnum,
  type InlineResponse200,
  type NonPersistentTopicStats,
  type PartitionedTopicStatsImpl,
  type PersistentTopicInternalStats,
  type PartitionedTopicInternalStats,
  type TopicStats
} from '@streamnative/pulsar-admin-client-typescript'
import type { Cluster } from './useCluster'
import type { PulsarState } from './usePulsarState'
import Day from 'dayjs'
import axios from 'axios'
type TopicBroker = InlineResponse200
import { chunk } from 'lodash-es'

export const PARTITION_SEPARATOR = '-partition-'
export const PERSISTENT_PROTOCOL = 'persistent://'
export const NON_PERSISTENT_PROTOCOL = 'non-persistent://'
export type TopicMap = Record<string, TopicDetails> // short topic name to topic details
export type Message = {
  properties: string
  messageId: string
  message: string
  ledgerId: string
  entryId: string
  data: string
}
export type TopicMetric = {
  // storage size
  storageSize: number | undefined
  // backlog size
  backlogSize: number | undefined
}
export type TopicDetails = {
  organization: string
  clusterUid: string
  persistency: boolean
  tenant: string
  namespace: string
  topicName: string // i.e. `c1` and `c2-partition-0`
  topicRootName: string // i.e. `c1` and `c2`
  topicFullName: string // i.e. persistent://a/b/c2-partitioned-1

  /**
   * undefined if topic is not a partitioned.  i.e. undefined for `c1`
   * show partition id if defined. i.e. 1 for `c2-partitioned-1`
   */
  partitionId: string | undefined
  /**
   * 0 if topic it self is not partitioned. i.e. 0 for `c2-partitioned-0`
   * positive number if topic is partitoned. i.e. 2 for `c2`
   * undefined means we don't know yet
   */
  partitionCount: number | undefined

  metrics: TopicMetric | undefined
}

let lastSeen: string | undefined = undefined
const { t } = i18n.global
const topicMap = ref<TopicMap>({})
const topicStats = ref<PartitionedTopicStatsImpl | TopicStats | NonPersistentTopicStats>({})
const bundleRange = ref('')
const topicBroker = ref<TopicBroker>({
  brokerUrl: '',
  httpUrl: '',
  nativeUrl: '',
  brokerUrlSsl: ''
})
const schemas = ref<Array<GetSchemaResponse>>([])
const topicInternalStats = ref<PersistentTopicInternalStats>({})
const partitionedTopicnternalStats = ref<PartitionedTopicInternalStats>({})
const compactionStatus = ref<LongRunningProcessStatusStatusEnum>(
  LongRunningProcessStatusStatusEnum.NOTRUN
)
const authorizedRoles = ref<Record<string, string[]>>({})
const messages = ref<Message[]>([])
const errorGettingTopics = ref<{ message: string } | undefined>(undefined)

const resetTopicState = () => {
  topicMap.value = {}
  topicStats.value = {}
  bundleRange.value = ''
  topicBroker.value = {
    brokerUrl: '',
    httpUrl: '',
    nativeUrl: '',
    brokerUrlSsl: ''
  }
  schemas.value = []
  topicInternalStats.value = {}
  partitionedTopicnternalStats.value = {}
  compactionStatus.value = LongRunningProcessStatusStatusEnum.NOTRUN
  authorizedRoles.value = {}
  messages.value = []
}

const topicFullNameToDetail = (
  topicFullName: string,
  partitionCount: number | undefined,
  metrics?: TopicMetric,
  params?:
    | {
        organization: string
        clusterUid: string
        tenant: string
        namespace: string
      }
    | undefined
): TopicDetails => {
  // eslint-disable-next-line @typescript-eslint/no-unused-vars
  const [protocol, _, tenant, namespace, topicName] = topicFullName.split('/')
  const [topicRootName, partitionId] = (topicName ?? '').split(PARTITION_SEPARATOR)
  if (!params) {
    params = apiBaseParams()
  }
  return {
    organization: params.organization,
    clusterUid: params.clusterUid,
    persistency: `${protocol}//` === PERSISTENT_PROTOCOL,
    tenant,
    namespace,
    topicName,
    topicRootName,
    topicFullName,
    partitionId,
    partitionCount,
    metrics: metrics || { storageSize: undefined, backlogSize: undefined }
  }
}

/**
 * Given a list of topic names, fetch partition count if their partition count is undefined.
 * This will allow us lazily fetch partition counts if we don't know.
 * @param topicNames
 */
const ensureTopicPartitionIsLoaded = async (topicNames: string[]) => {
  const _topicMap = await ensureTopicPartitionIsLoadedWithTopicMap(topicNames, topicMap.value)
  topicMap.value = _topicMap
}

/**
 * Given a list of topic names, fetch partition count if their partition count is undefined.
 * This will allow us lazily fetch partition counts if we don't know.
 * @param topicNames
 */
const ensureTopicPartitionIsLoadedWithTopicMap = async (
  topicNames: string[],
  topicMap: TopicMap
): Promise<TopicMap> => {
  const topicMapCopy = { ...topicMap }

  const promises = topicNames.map(async topicName => {
    const topicDetail = topicMapCopy[topicName]
    if (topicDetail && topicDetail.partitionCount === undefined) {
      const partitionMetaResp = await getPartitionedMetadata(apiPerTopicParams(topicName))
      const partitionCount = partitionMetaResp.data.partitions
      if (partitionCount === undefined || partitionCount < 0) {
        throw Error(`partition count of ${partitionCount} is invalid`)
      }
      topicDetail.partitionCount = partitionCount
      topicMapCopy[topicDetail.topicName] = topicDetail

      for (let i = 0; i < partitionCount; i++) {
        const partitionedTopicFullname = `${topicDetail.topicFullName}${PARTITION_SEPARATOR}${i}`
        const partitionedTopicDetail = topicFullNameToDetail(partitionedTopicFullname, 0)
        topicMapCopy[partitionedTopicDetail.topicName] = partitionedTopicDetail
      }
    }
  })
  await Promise.all(promises)
  return topicMapCopy
}

const ensureTopicMetricsIsLoaded = async (topicNames: string[]) => {
  const { isCNPoolMember, activeCluster } = useCluster()
  if (isCNPoolMember(activeCluster.value?.webServiceUrl)) {
    return
  }
  const { mustTenant, mustNamespace } = usePulsarState()
  const topicMapCopy = { ...topicMap.value }
  const topics = topicNames.map(topicName => {
    const topicDetail = topicMapCopy[topicName]
    return {
      scheme: topicDetail.persistency ? 'persistent' : 'non-persistent',
      partitioned: !!topicDetail.partitionCount,
      name: topicName
    }
  })
  if (topics.length > 0) {
    const start = Day().subtract(15, 'm').unix()
    const end = Day().unix()
    const _topicsChunks = chunk(topics, 50) // 50 is the max number of topics that can be queried at once for metrics api request
    // eslint-disable-next-line @typescript-eslint/no-explicit-any
    let result: any[] = []
    // 'cause metrics server concurrent request limit, we request metrics in chunks one by one
    for (const _topicChunk of _topicsChunks) {
      const metrics = await getTopicsMetrics(
        mustTenant(),
        mustNamespace(),
        _topicChunk,
        start,
        end
      ).catch(error => {
        console.error(error)
        useError(getErrorMessage(error))
      })
      if (metrics && metrics.result) {
        result = result.concat(metrics.result)
      }
    }

    const topicsMetrics = result.map(
      ({
        metric: {
          name,
          labels: { topic }
        },
        values
      }) => {
        const value = values && values.length > 0 ? values[values.length - 1].value : 0
        const topicName = topic.substring(topic.lastIndexOf('/') + 1)
        return { topic: topicName, metric: name, value }
      }
    )
    topicNames.forEach(async topicName => {
      const topicDetail = topicMapCopy[topicName]
      if (topicDetail) {
        const storageSize = (
          topicsMetrics.find(({ topic, metric }) => {
            return topic === topicName && metric === 'pulsar_storage_size'
          }) || { value: 0 }
        ).value
        const backlogSize = (
          topicsMetrics.find(({ topic, metric }) => {
            return topic === topicName && metric === 'pulsar_storage_backlog_size'
          }) || { value: 0 }
        ).value
        topicMapCopy[topicDetail.topicName] = {
          ...topicDetail,
          metrics: { storageSize, backlogSize }
        }
      }
    })
    topicMap.value = topicMapCopy
  }
}

const getTopics = async (params: {
  organization: string
  clusterUid: string
  tenant: string
  namespace: string
}) => {
  const { mustTenant, mustNamespace } = usePulsarState()
  let topics = {}
  try {
    topics = await getTopicsWithTenantAndNamespace({
      ...params,
      tenant: mustTenant(),
      namespace: mustNamespace()
    })
    errorGettingTopics.value = undefined
  } catch (e) {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    errorGettingTopics.value = { message: e.toString() }
    throw e
  }

  // for noPersPart, we can infer partition details from noPersNoPart if there are data for it.
  // if noPersNoPart is missing partition info, partition info will be laziy fetch via `ensureTopicPartitionIsLoaded`.
  // Object.values(noPersPart).forEach(detail => {
  //   if (detail.partitionId) {
  //     // increment partition count as persNoPart includes individual partitioned topic names
  //     const currentPartitionCount = noPersNoPart[detail.topicRootName].partitionCount
  //     noPersNoPart[detail.topicRootName].partitionCount = (currentPartitionCount ?? 0) + 1
  //   }
  // })
  topicMap.value = topics
}

const getTopicsWithTenantAndNamespace = async (params: {
  organization: string
  clusterUid: string
  tenant: string
  namespace: string
}): Promise<TopicMap> => {
  const topicResponses = await Promise.all([
    apiGetTopics({ ...params, persistent: true, partitioned: true }),
    apiGetTopics({ ...params, persistent: true, partitioned: false }),
    apiGetTopics({ ...params, persistent: false, partitioned: true }),
    apiGetTopics({ ...params, persistent: false, partitioned: false })
  ])
  const [persPart, persNoPart, noPersNoPart, noPersPart] = topicResponses.map(res => {
    return Object.fromEntries(
      res.data.map((name, i) => {
        const partitionCount = i === 0 || i === 2 ? undefined : 0
        const topicDetail = topicFullNameToDetail(name, partitionCount, undefined, params)
        return [topicDetail.topicName, topicDetail]
      })
    )
  })

  // for persPart, we can infer partition details from persNoPart.
  Object.values(persNoPart).forEach(detail => {
    // there are some cases where non partitioned api returnes partitioned topic such as `abc-partition-0`
    // but persistent topic doesn't `abc` doesn't exists.  This seems like a pulsar bug and we should simply
    // ignore those topics.
    if (detail.partitionId && persPart[detail.topicRootName]) {
      // increment partition count as persNoPart includes individual partitioned topic names
      const currentPartitionCount = persPart[detail.topicRootName].partitionCount
      persPart[detail.topicRootName].partitionCount = (currentPartitionCount ?? 0) + 1
    }
  })

  return {
    ...persPart,
    ...persNoPart,
    ...noPersPart,
    ...noPersNoPart
  }
}

const apiBaseParams = () => {
  const { mustOrganization, mustClusterUid, mustTenant, mustNamespace } = usePulsarState()
  return {
    organization: mustOrganization(),
    clusterUid: mustClusterUid(),
    tenant: mustTenant(),
    namespace: mustNamespace()
  }
}

/**
 * This function will create params with the global topic map
 */
const apiPerTopicParams = (shortTopicName?: string) => {
  const { mustTopic } = usePulsarState()
  return apiPerTopicParamsWithTopicMap(shortTopicName ?? mustTopic(), topicMap.value)
}

/**
 * This function will create params with a specify topic map
 */
const apiPerTopicParamsWithTopicMap = (shortTopicName: string, topicMap: TopicMap) => {
  const { mustOrganization, mustClusterUid, mustTenant, mustNamespace } = usePulsarState()
  const topicDetail = topicMap[shortTopicName]
  return {
    name: topicDetail.topicName,
    partitioned: (topicDetail.partitionCount ?? 0) > 0,
    organization: topicDetail.organization ?? mustOrganization(),
    clusterUid: topicDetail.clusterUid ?? mustClusterUid(),
    tenant: topicDetail.tenant ?? mustTenant(),
    namespace: topicDetail.namespace ?? mustNamespace(),
    persistent: topicDetail.persistency
  }
}

/**
 * This function will impact the global topic map
 */
const createTopic = async ({
  name, // just a short topic name without protocl, tenant and namespace
  persistent,
  partitionCount
}: {
  name: string
  persistent: boolean
  partitionCount: number
}) => {
  const { mustTenant, mustNamespace } = usePulsarState()
  await createTopicWithTenantAndNamespace({
    name,
    persistent,
    partitionCount,
    tenant: mustTenant(),
    namespace: mustNamespace()
  })
  // add newly created topic to the topic map
  const protocol = persistent ? PERSISTENT_PROTOCOL : NON_PERSISTENT_PROTOCOL
  const fullTopicName = `${protocol}${mustTenant()}/${mustNamespace()}/${name}`
  const topicDetail = topicFullNameToDetail(fullTopicName, partitionCount)
  const topicMapCopy = { ...topicMap.value, [topicDetail.topicName]: topicDetail }
  for (let i = 0; i < partitionCount; i++) {
    const partitionedTopicDetail = topicFullNameToDetail(
      `${fullTopicName}${PARTITION_SEPARATOR}${i}`,
      0
    )
    topicMapCopy[partitionedTopicDetail.topicName] = partitionedTopicDetail
  }
  topicMap.value = topicMapCopy
}

/**
 * This function will not impact the global topic map,
 * sometimes we just temprorary create a topic for a specified tenant/ namespace, but not for the global tenant / namespace
 * for example, in clients page, we select the tenant and namespace to create a topic but not expect to impact the global topic list for global tenant/namespace selection.
 */
const createTopicWithTenantAndNamespace = async ({
  name, // just a short topic name without protocl, tenant and namespace
  persistent,
  partitionCount,
  tenant,
  namespace
}: {
  name: string
  persistent: boolean
  partitionCount: number
  tenant: string
  namespace: string
}) => {
  if (!name) {
    throw Error(t('topic.topicPlaceholder'))
  }
  if (partitionCount < 0) {
    throw Error(t('topic.partitionPlaceholder'))
  }
  const params = { ...apiBaseParams(), tenant, namespace }
  try {
    await apiCreateTopic({
      ...params,
      persistent,
      partitioned: partitionCount > 0,
      name,
      partitionCount
    })
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.createTopicFailed')))
  }
}

/**
 * This function will impact the global topic map
 */
const updateTopic = async ({
  name, // just a short topic name without protocl, tenant and namespace
  persistent,
  partitionCount
}: {
  name: string
  persistent: boolean
  partitionCount: number
}) => {
  const { mustTenant, mustNamespace } = usePulsarState()
  await updateTopicWithTenantAndNamespace({
    name,
    persistent,
    partitionCount,
    tenant: mustTenant(),
    namespace: mustNamespace()
  })
  // update the topic in the topic map
  const protocol = persistent ? PERSISTENT_PROTOCOL : NON_PERSISTENT_PROTOCOL
  const fullTopicName = `${protocol}${mustTenant()}/${mustNamespace()}/${name}`
  const topicDetail = topicFullNameToDetail(fullTopicName, partitionCount)
  const topicMapCopy = { ...topicMap.value, [topicDetail.topicName]: topicDetail }
  for (let i = 0; i < partitionCount; i++) {
    const partitionedTopicDetail = topicFullNameToDetail(
      `${fullTopicName}${PARTITION_SEPARATOR}${i}`,
      0
    )
    topicMapCopy[partitionedTopicDetail.topicName] = partitionedTopicDetail
  }
  topicMap.value = topicMapCopy
}

/**
 * This function will not impact the global topic map,
 * sometimes we just temprorary create a topic for a specified tenant/ namespace, but not for the global tenant / namespace
 * for example, in clients page, we select the tenant and namespace to create a topic but not expect to impact the global topic list for global tenant/namespace selection.
 */
const updateTopicWithTenantAndNamespace = async ({
  name, // just a short topic name without protocl, tenant and namespace
  persistent,
  partitionCount,
  tenant,
  namespace
}: {
  name: string
  persistent: boolean
  partitionCount: number
  tenant: string
  namespace: string
}) => {
  if (!name) {
    throw Error(t('topic.topicPlaceholder'))
  }
  const selectedTopicDetail = topicMap.value[name]
  const minPartitionCount = selectedTopicDetail.partitionCount ?? 0
  if (partitionCount < minPartitionCount) {
    throw Error(t('topic.partitionPlaceholder'))
  }
  const params = { ...apiBaseParams(), tenant, namespace }
  try {
    await apiUpdateTopic({
      ...params,
      persistent,
      partitioned: partitionCount > 0,
      name,
      partitionCount
    })
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.updateTopicFailed')))
  }
}

const deleteTopic = async (shortTopicName: string) => {
  await ensureTopicPartitionIsLoaded([shortTopicName])
  const params = apiPerTopicParams(shortTopicName)
  await apiDeleteTopic(params)
  const topicValueCopy = {
    ...topicMap.value
  }
  delete topicValueCopy[params.name]
  topicMap.value = topicValueCopy
}

/**
 * This function will impact the global topic stats
 */
const getTopicStats = async (shortTopicName: string) => {
  topicStats.value = await getTopicStatsWithTopicMap(shortTopicName, topicMap.value)
}

/**
 * This function will not impact the global topic stats, it just return the topic stats data from the response
 */
const getTopicStatsWithTopicMap = async (
  shortTopicName: string,
  topicMap: TopicMap
): Promise<TopicStats | NonPersistentTopicStats> => {
  Object.assign(
    topicMap,
    await ensureTopicPartitionIsLoadedWithTopicMap([shortTopicName], topicMap)
  )
  const params = apiPerTopicParamsWithTopicMap(shortTopicName, topicMap)
  try {
    return (await fetchTopicStats(params)).data
  } catch (e) {
    throw Error(
      getErrorMessage(
        e,
        t(
          `topic.notification.${
            params.partitioned ? 'getPartitionedTopicStats' : 'getTopicsStatsFailed'
          }`
        )
      )
    )
  }
}

const createSubscription = async (shortTopicName: string, subscriptionName: string) => {
  await createSubscriptionWithTopicMap(shortTopicName, subscriptionName, topicMap.value)
}

const createSubscriptionWithTopicMap = async (
  shortTopicName: string,
  subscriptionName: string,
  topicMap: TopicMap
) => {
  if (!subscriptionName) {
    throw Error(t('topic.subscription.subNotification'))
  }
  Object.assign(
    topicMap,
    await ensureTopicPartitionIsLoadedWithTopicMap([shortTopicName], topicMap)
  )
  const params = apiPerTopicParamsWithTopicMap(shortTopicName, topicMap)

  try {
    await apiCreateSubscription({
      ...params,
      subscriptionName
    })
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.subscription.failedToCreate')))
  }

  await getTopicStatsWithTopicMap(shortTopicName, topicMap)
}

const deleteSubscription = async (shortTopicName: string, subscriptionName: string) => {
  await apiDeleteSubscription({
    ...apiPerTopicParams(shortTopicName),
    subscriptionName
  })
  const topicSubs = topicStats.value.subscriptions ?? {}
  delete topicSubs[subscriptionName]
  topicStats.value = { ...topicStats.value, subscriptions: topicSubs }
}

const resetAllSubMessage = async (
  shortTopicName: string,
  subscriptionName: string,
  minutes: number
) => {
  const timestamp = Math.floor(new Date().getTime()) - minutes * 60000
  try {
    await resetCursor({ ...apiPerTopicParams(shortTopicName), subscriptionName, timestamp })
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.subscription.failedToReset')))
  }
}

const clearAllSubMessage = async (shortTopicName: string, subscriptionName: string) => {
  try {
    await skipAllMessages({ ...apiPerTopicParams(shortTopicName), subscriptionName })
  } catch (e) {
    throw Error(getErrorMessage(e))
  }
}

const getBundleRange = async (shortTopicName: string) => {
  try {
    const _bundleRange = await getTopicBundle(apiPerTopicParams(shortTopicName))
    bundleRange.value = _bundleRange.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getBundleRange Error'))
  }
}

const getTopicBroker = async (shortTopicName: string) => {
  try {
    const _topicBroker = await apiGetTopicBroker(apiPerTopicParams(shortTopicName))
    topicBroker.value = _topicBroker.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getAllSchemas Error'))
  }
}

const getAllSchemas = async (shortTopicName: string) => {
  try {
    const _schemas = await fetchAllSchemas(apiPerTopicParams(shortTopicName))
    schemas.value = _schemas.data.getSchemaResponses ?? []
  } catch (e) {
    throw Error(getErrorMessage(e, 'getAllSchemas Error'))
  }
}

const deleteAllSchemas = async (shortTopicName: string) => {
  try {
    await deleteSchema(apiPerTopicParams(shortTopicName))
    schemas.value = []
  } catch (e) {
    throw Error(getErrorMessage(e, t('schema.deleteSchemaFailed')))
  }
}

const createSchema = async (
  shortTopicName: string,
  type: GetSchemaResponseTypeEnum,
  data: string,
  propertyEntries: [string, string][]
) => {
  const keys = new Set()
  propertyEntries.forEach(([key, value]) => {
    if (!key || !value) {
      throw Error('A key and a value must be defined for all properties')
    }
    keys.add(key)
  })

  if (keys.size !== propertyEntries.length) {
    throw Error(t('schema.keyIsExistNotification'))
  }

  const properties = Object.fromEntries(propertyEntries)
  const res = await postSchema({
    ...apiPerTopicParams(shortTopicName),
    body: {
      type,
      schema: data,
      properties
    }
  })

  const newSchema: GetSchemaResponse = {
    // eslint-disable-next-line @typescript-eslint/ban-ts-comment
    // @ts-ignore
    version: res.data?.version?.version,
    type,
    data,
    properties
  }
  const newSchemas = schemas.value
  newSchemas.push(newSchema)
  schemas.value = newSchemas
}

const getTopicStatsInternal = async (shortTopicName: string) => {
  const params = apiPerTopicParams(shortTopicName)
  const { data } = await getInternalStats(params)
  if (params.partitioned) {
    partitionedTopicnternalStats.value = data as PartitionedTopicInternalStats
    topicInternalStats.value = {}
  } else {
    partitionedTopicnternalStats.value = {}
    topicInternalStats.value = data as PersistentTopicInternalStats
  }
}

const unloadTopic = async (shortTopicName: string) => {
  await apiUnloadTopic(apiPerTopicParams(shortTopicName))
}

const getCompactionStatus = async (shortTopicName: string) => {
  try {
    const { data } = await apiCompactionStatus(apiPerTopicParams(shortTopicName))
    compactionStatus.value = data.status ?? LongRunningProcessStatusStatusEnum.NOTRUN
  } catch (e) {
    throw Error(getErrorMessage(e, t('topic.notification.compactionFailed')))
  }
}

const compactTopic = async (shortTopicName: string) => {
  await compact(apiPerTopicParams(shortTopicName))
  await getCompactionStatus(shortTopicName)
}

const getPermissions = async (shortTopicName: string) => {
  try {
    const permissions = await getPermissionsOnTopic(apiPerTopicParams(shortTopicName))
    authorizedRoles.value = permissions.data
  } catch (e) {
    throw Error(getErrorMessage(e, 'getPermissions error'))
  }
}

const grantPermissions = async ({ role, permissions }: { role: string; permissions: string[] }) => {
  grantPermissionsOnTopic({ ...apiPerTopicParams(), role, permissions })
  authorizedRoles.value = { ...authorizedRoles.value, [role]: permissions }
}

const revokePermissions = async ({ role }: { role: string }) => {
  await revokePermissionsOnTopic({ ...apiPerTopicParams(), role })

  const newRoles = { ...authorizedRoles.value }
  delete newRoles[role]
  authorizedRoles.value = newRoles
}
const viewMessages = async (shortTopicName: string, subscriptionName: string, count: number) => {
  if (count <= 0) throw t('topic.subscription.messageGreaterThanZero')
  if (count > 100) throw t('schema.maxMessageNotification')
  if (!subscriptionName) throw t('topic.selectSubMessage')

  // Below code is extraction of logics at pulsar admin client
  // https://github.com/streamnative/pulsar/blob/master/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java#L880-L913
  messages.value = []
  try {
    for (let i = 1; i <= count; i++) {
      const res = await peekNthMessage({
        ...apiPerTopicParams(),
        subscriptionName,
        n: i
      })

      const properties: Record<string, string | string[]> = {}
      Object.entries(res.headers).forEach(([key, value]) => {
        if (key.startsWith('x-pulsar-property-')) {
          properties[key.slice('x-pulsar-property-'.length)] = value
        } else if (key === 'x-pulsar-batch-size' || key === 'x-pulsar-num-batch-message') {
          properties[key] = value
        }
      })
      const messageId = res.headers['x-pulsar-message-id']
      const tokenizedMessageId = messageId.split(':')
      const payload = new Uint8Array(res.data as unknown as ArrayBuffer)
      // returned binary is java serialized object.
      // there is no good library at npm to deserialize java object without defining java object schema.
      // so simply find the the separator for the payload and extract value is the simplest way.
      const data = payload.slice(payload.indexOf(2) + 1)
      const dataString = new TextDecoder().decode(data)

      messages.value = [
        ...messages.value,
        {
          properties: JSON.stringify(properties, null, 2),
          messageId: res.headers['x-pulsar-message-id'],
          message: dataString,
          ledgerId: tokenizedMessageId[0],
          entryId: tokenizedMessageId[1],
          data: dataString
        }
      ]
    }
  } catch (e) {
    // peekNthMessage() will respond with 404 when n is bigger than message it has.
    // in such case, simply catch and ignore as no more messages to read.
    if (!axios.isAxiosError(e) || e.response?.status !== 404) {
      throw e
    }
  }
}

export const init = (initialState: PulsarState) => {
  const { organization, tenant, namespace } = usePulsarState()
  const { activeCluster } = useCluster()
  const valueChanged = async ([org, clus, ten, ns]: [
    string | undefined,
    Cluster | undefined,
    string | undefined,
    string | undefined
  ]) => {
    const isClusterReady = clus?.conditions.Ready ?? false
    const clusterUid = clus?.uid ?? undefined
    if (!org || !clusterUid || !isClusterReady || !ten || !ns) {
      topicMap.value = {}
      lastSeen = undefined
      return
    }
    const seen = `${org}/${clusterUid}/${ten}/${ns}`
    if (lastSeen !== seen) {
      await getTopics({
        organization: org,
        clusterUid: clusterUid,
        tenant: ten,
        namespace: ns
      })
    }

    lastSeen = seen
  }

  watch([organization, activeCluster, tenant, namespace], valueChanged)
  return valueChanged([
    initialState.organization,
    activeCluster.value,
    initialState.tenant,
    initialState.namespace
  ])
}

export const useTopic = () => {
  return {
    topicMap,
    topicDetails: computed(() => {
      // contains all topics: i.e. [c1, c2, c2-partition-0, c2-partition-1]
      return Object.values(topicMap.value).sort((a, b) => a.topicName.localeCompare(b.topicName))
    }),
    topicRootDetails: computed(() => {
      // only contains root topics.  i.e. [c1, c2]
      return Object.values(topicMap.value)
        .filter(detail => detail.partitionId === undefined)
        .sort((a, b) => a.topicName.localeCompare(b.topicName))
    }),
    // selectedTopicDetail,
    topicStats,
    bundleRange,
    topicBroker,
    schemas,
    topicInternalStats,
    partitionedTopicnternalStats,
    compactionStatus,
    authorizedRoles,
    messages,
    getTopics,
    getTopicsWithTenantAndNamespace,
    createTopic,
    createTopicWithTenantAndNamespace,
    updateTopic,
    updateTopicWithTenantAndNamespace,
    deleteTopic,
    getTopicStats,
    getTopicStatsWithTopicMap,
    createSubscription,
    createSubscriptionWithTopicMap,
    deleteSubscription,
    resetAllSubMessage,
    clearAllSubMessage,
    getBundleRange,
    getTopicBroker,
    getAllSchemas,
    deleteAllSchemas,
    createSchema,
    getTopicStatsInternal,
    unloadTopic,
    compactTopic,
    getPermissions,
    grantPermissions,
    revokePermissions,
    viewMessages,
    resetTopicState,
    getCompactionStatus,
    ensureTopicPartitionIsLoaded,
    topicFullNameToDetail,
    ensureTopicMetricsIsLoaded,
    init,
    errorGettingTopics
  }
}
