'use strict';

const Utils = require('../misc/utils');
const ZLib = require('zlib');

//increase by level to avoid buffer copy.
const SMALL_BUFFER_SIZE = 2048;
const MEDIUM_BUFFER_SIZE = 131072; //128k
const LARGE_BUFFER_SIZE = 1048576; //1M
const MAX_BUFFER_SIZE = 16777222; //16M + 7

/**
/**
 * MySQL compression filter.
 * see https://mariadb.com/kb/en/library/0-packet/#compressed-packet
 */
class CompressionOutputStream {
  /**
   * Constructor
   *
   * @param socket    current socket
   * @param opts      current connection options
   * @param info      current connection information
   * @constructor
   */
  constructor(socket, opts, info) {
    this.info = info;
    this.opts = opts;
    this.pos = 7;
    this.header = Buffer.allocUnsafe(7);
    this.buf = Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
    this.writer = (buffer) => {
      socket.write(buffer);
    };
  }

  growBuffer(len) {
    let newCapacity;
    if (len + this.pos < MEDIUM_BUFFER_SIZE) {
      newCapacity = MEDIUM_BUFFER_SIZE;
    } else if (len + this.pos < LARGE_BUFFER_SIZE) {
      newCapacity = LARGE_BUFFER_SIZE;
    } else newCapacity = MAX_BUFFER_SIZE;

    let newBuf = Buffer.allocUnsafe(newCapacity);
    this.buf.copy(newBuf, 0, 0, this.pos);
    this.buf = newBuf;
  }

  writeBuf(arr, cmd) {
    let off = 0,
      len = arr.length;
    if (len > this.buf.length - this.pos) {
      if (this.buf.length !== MAX_BUFFER_SIZE) {
        this.growBuffer(len);
      }

      //max buffer size
      if (len > this.buf.length - this.pos) {
        //not enough space in buffer, will stream :
        // fill buffer and flush until all data are snd
        let remainingLen = len;

        while (true) {
          //filling buffer
          let lenToFillBuffer = Math.min(MAX_BUFFER_SIZE - this.pos, remainingLen);
          arr.copy(this.buf, this.pos, off, off + lenToFillBuffer);
          remainingLen -= lenToFillBuffer;
          off += lenToFillBuffer;
          this.pos += lenToFillBuffer;

          if (remainingLen === 0) return;
          this.flush(false, cmd, remainingLen);
        }
      }
    }
    arr.copy(this.buf, this.pos, off, off + len);
    this.pos += len;
  }

  /**
   * Flush the internal buffer.
   */
  flush(cmdEnd, cmd, remainingLen) {
    if (this.pos < 1536) {
      //*******************************************************************************
      // small packet, no compression
      //*******************************************************************************

      this.buf[0] = this.pos - 7;
      this.buf[1] = (this.pos - 7) >>> 8;
      this.buf[2] = (this.pos - 7) >>> 16;
      this.buf[3] = ++cmd.compressSequenceNo;
      this.buf[4] = 0;
      this.buf[5] = 0;
      this.buf[6] = 0;

      if (this.opts.debugCompress) {
        console.log(
          '==> conn:%d %s (compress)\n%s',
          this.info.threadId ? this.info.threadId : -1,
          cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
          Utils.log(this.opts, this.buf, 0, this.pos)
        );
      }

      this.writer(this.buf.slice(0, this.pos));
    } else {
      //*******************************************************************************
      // compressing packet
      //*******************************************************************************
      //use synchronous inflating, to ensure FIFO packet order
      const compressChunk = ZLib.deflateSync(this.buf.slice(7, this.pos));
      const compressChunkLen = compressChunk.length;

      this.header[0] = compressChunkLen;
      this.header[1] = compressChunkLen >>> 8;
      this.header[2] = compressChunkLen >>> 16;
      this.header[3] = ++cmd.compressSequenceNo;
      this.header[4] = this.pos - 7;
      this.header[5] = (this.pos - 7) >>> 8;
      this.header[6] = (this.pos - 7) >>> 16;

      if (this.opts.debugCompress) {
        console.log(
          '==> conn:%d %s (compress)\n%s',
          this.info.threadId ? this.info.threadId : -1,
          cmd ? cmd.constructor.name + '(0,' + this.pos + '=>' + compressChunkLen + ')' : 'unknown',
          Utils.log(this.opts, compressChunk, 0, compressChunkLen, this.header)
        );
      }

      this.writer(this.header);
      this.writer(compressChunk);
      if (cmdEnd && this.pos === MAX_BUFFER_SIZE) this.writeEmptyPacket(cmd);
      this.header = Buffer.allocUnsafe(7);
    }
    this.buf = remainingLen
      ? CompressionOutputStream.allocateBuffer(remainingLen)
      : Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
    this.pos = 7;
  }

  static allocateBuffer(len) {
    if (len + 4 < SMALL_BUFFER_SIZE) {
      return Buffer.allocUnsafe(SMALL_BUFFER_SIZE);
    } else if (len + 4 < MEDIUM_BUFFER_SIZE) {
      return Buffer.allocUnsafe(MEDIUM_BUFFER_SIZE);
    } else if (len + 4 < LARGE_BUFFER_SIZE) {
      return Buffer.allocUnsafe(LARGE_BUFFER_SIZE);
    }
    return Buffer.allocUnsafe(MAX_BUFFER_SIZE);
  }

  writeEmptyPacket(cmd) {
    const emptyBuf = Buffer.from([0x00, 0x00, 0x00, cmd.compressSequenceNo, 0x00, 0x00, 0x00]);

    if (this.opts.debugCompress) {
      console.log(
        '==> conn:%d %s (compress)\n%s',
        this.info.threadId ? this.info.threadId : -1,
        cmd ? cmd.constructor.name + '(0,' + this.pos + ')' : 'unknown',
        Utils.log(this.opts, emptyBuf, 0, 7)
      );
    }

    this.writer(emptyBuf);
  }
}

module.exports = CompressionOutputStream;