const { KEEP_ALIVE_DATA } = require("../constants")

const SPLIT = "SPLIT"
const PEEK = "PEEK"

const validSSEKeys = ["id", "data", "retry", "event"]

const DATA_KEY = "data"

const parseData = (key, dataRaw) => {
  return key === "data" && dataRaw !== KEEP_ALIVE_DATA ? JSON.parse(dataRaw) : dataRaw
}

const validateKey = (key) => (validSSEKeys.includes(key) ? key : null)

const getKeyAndDataForLine = (line) => {
  const indexOfFirstColon = line.indexOf(":")
  if (indexOfFirstColon === -1) {
    return [null, line]
  }

  const key = validateKey(line.slice(0, indexOfFirstColon))
  const value = key ? line.slice(indexOfFirstColon + 1) : line
  return [key, value]
}

class MessageDecoder {
  strategy = null
  _logger = null

  constructor(strategy, services) {
    this.strategy = strategy
    this._logger = services?.logger
    this._config = services?.config
  }

  start(controller) {
    controller.buf = ""
    controller.pos = 0
    controller.payload = {}
    controller.lines = []
  }

  /* istanbul ignore next */
  _transformSplitStrategy(chunk, controller) {
    //TODO: fix or remove this function
    let payload = null
    for (let line of chunk.split("\n")) {
      if (line.length === 0) {
        if (payload) {
          controller.enqueue(payload)
          payload = null
        }
        continue
      }
      const [key, dataRaw] = getKeyAndDataForLine(line)
      payload = {
        ...(payload || {}),
        [key]: parseData(key, dataRaw),
      }
    }
  }

  emit(controller, chunk) {
    const payload = controller.lines.reduce((acc, [key, value]) => {
      return {
        ...acc,
        [key]: acc[key] ? acc[key] + value : value,
      }
    }, {})
    controller.payload = {}
    controller.lines = []

    try {
      if (!payload.id || !payload.data) {
        this._logger?.()?.error("FetchEventStream: payload is missing required fields", payload, controller.buf)
      }
      payload.data = parseData(DATA_KEY, payload.data.replace("\n", ""))
      controller.enqueue(payload)
    } catch (err) {
      this._logger?.()?.warn(
        `FetchEventStream: failed when parsing chunk ${chunk} from buffer ${controller.buf}. Raw bytestream:`,
        new TextEncoder()?.encode(controller.buf)
      )
    }
  }

  feedLine(line, controller, chunk) {
    if (line.length === 0) {
      return this.emit(controller, chunk)
    }
    const [key, dataRaw] = getKeyAndDataForLine(line)

    if (!key) {
      this._logger?.()?.warn?.(`FetchEventStream: Incomplete message received, no key was detected. ${controller.buf}`)
    }

    let currentKey = key || controller.currentKey
    controller.currentKey = currentKey
    controller.lines.push([currentKey, dataRaw])
  }

  _transformPeekStategy(chunk, controller) {
    controller.buf += chunk

    while (controller.pos < controller.buf.length) {
      if (controller.buf[controller.pos] === "\n") {
        const line = controller.buf.substring(0, controller.pos)
        controller.buf = controller.buf.substring(controller.pos + 1)
        controller.pos = 0
        this.feedLine(line, controller, chunk)
      } else {
        ++controller.pos
      }
    }
  }

  transform(chunk, controller) {
    if (this._config?.()?.get("sse.v2.eventStream.logRawPayloads")) {
      this._logger?.()?.debug("SSEPipeline: Incoming SSE Raw payload", chunk)
    }
    try {
      if (this.strategy === SPLIT) {
        this._transformSplitStrategy(chunk, controller)
      } else {
        this._transformPeekStategy(chunk, controller)
      }
    } catch (err) {
      //TODO: implement peek strategy error signaling
      this._logger?.()?.error(
        `SSEPipeline: An error has occurred while processing stream chunk under ${this.strategy}`,
        err,
        chunk
      )
    }
  }
}

const buildSSEMessageDecoder = (strategy = PEEK, services) => {
  return new TransformStream(new MessageDecoder(strategy, services))
}

module.exports = {
  MessageDecoder,
  buildSSEMessageDecoder,
  PEEK,
  SPLIT,
}
