import * as aq from 'arquero';
// eslint-disable-next-line import/no-extraneous-dependencies
import * as arrow from 'apache-arrow';
// eslint-disable-next-line import/no-extraneous-dependencies
import { makeVector, Type } from 'apache-arrow';

const types = { cache_from: Type.Utf8, date: Type.Utf8 };

const findMissingColumns = (table, columns) => {
  const cols = table.columnNames();
  const missingColumns = columns.filter(c => !cols.includes(c));
  return missingColumns;
};

const createColumnsWithDefaults = (table, missingColumns, columnDefaults) => {
  const defaultArrays = missingColumns.reduce((acc, colName) => ({
    ...acc,
    [colName]: columnDefaults[colName] === 'bigint' ? new BigInt64Array(table.size)
      : columnDefaults[colName] === 'number' ? new Float64Array(table.size)
        : new Array(table.size).fill(columnDefaults[colName]),
  }), {});

  const localTypes = missingColumns.reduce((acc, colName) => ({
    ...acc,
    [colName]: columnDefaults[colName] === '0n' ? Type.Int64
      : columnDefaults[colName] === '0.0' ? Type.Float64
        : Type.Utf8,
  }), {});

  const defaultTable = aq.fromArrow(aq.table(defaultArrays).toArrow({ types: localTypes }));

  return table.assign(defaultTable);
};

const HACK_renameColumnsIfExist = t => {
  const defaultRename = {
    conversions_even: 'even',
    conversions_first_touch: 'first_touch',
    conversions_last_touch: 'last_touch',
    conversions_normalized: 'normalized',
    ntf_conversions_even: 'ntf_even',
    ntf_conversions_first_touch: 'ntf_first_touch',
    ntf_conversions_last_touch: 'ntf_last_touch',
    ntf_conversions_normalized: 'ntf_normalized',
  };

  const cols = t.columnNames();
  const rename = Object.keys(defaultRename).reduce((p, c) => {
    if (cols.includes(c)) {
      return Object.assign(p, { [c]: defaultRename[c] });
    }
    return p;
  }, {});

  return t
    .rename(rename);
};

export const getColumnDefaults = allTables => allTables.reduce((obj, t) => {
  const newObj = { ...obj };
  t.columnNames().forEach(col => {
    if (!newObj[col]) {
      const colType = typeof t.array(col)[0];
      newObj[col] = colType === 'bigint' ? '0n' : colType === 'number' ? '0.0' : colType === 'object' ? 0.0 : '""';
    }
  });
  return newObj;
}, {});

const prepOrderOfColumns = cols => table => table.select(cols);

const getArrowSchemaFromFields = fields => fields.reduce((p, f) => {
  const { name, type } = f;
  // eslint-disable-next-line no-param-reassign
  p[name] = type;
  const { children } = type;
  if (children) {
    // eslint-disable-next-line no-param-reassign
    p[name] = getArrowSchemaFromFields(children);
  }
  return p;
}, {});

// eslint-disable-next-line no-shadow
const getArrowSchema = arrow => {
  const schema = getArrowSchemaFromFields(arrow.schema.fields);
  return schema;
};

const getStructColumns = allArrow => {
  const mergedSchema = allArrow.reduce((previous, table) => {
    const schema = getArrowSchema(table);

    // perform deep merge on keys or nest objects schema and previous
    const merged = Object.keys(schema).reduce((p, c) => {
      if (previous[c]) {
        if (typeof (previous[c]) === 'object') {
          // eslint-disable-next-line no-param-reassign
          p[c] = Object.assign(previous[c], schema[c]);
        } else {
          // eslint-disable-next-line no-param-reassign
          p[c] = previous[c];
        }
      } else {
        // eslint-disable-next-line no-param-reassign
        p[c] = schema[c];
      }
      return p;
    }, {});

    return merged;
  }, {});

  const structColumns = Object.entries(mergedSchema).reduce((p, [k, v]) => {
    if (String(v) === '[object Object]') {
      // eslint-disable-next-line no-param-reassign
      p[k] = v;
    }
    return p;
  }, {});

  return structColumns;
};

const modifyTableStructs = structColumns => table => {
  const defaultValues = new Float64Array(table.numRows);
  const defaultVector = makeVector(defaultValues);
  const defaultData = defaultVector.data[0];

  // these are the current columns that are structs
  const structEntries = Object.entries(structColumns);

  const modifiedBatch = structEntries.reduce((batch, [column, structFieldTypes]) => {
    // eslint-disable-next-line no-unused-vars
    const { schema, data } = batch;
    const { fields } = schema;

    const currentIndex = fields.findIndex(f => f.name === column);
    const currentField = fields[currentIndex];

    const { type: { children } } = currentField;

    // find and augment the missing fields associated with the struct
    const newChildren = Object.entries(structFieldTypes)
      .map(([fieldName]) => {
        const index = children.findIndex(x => x.name === fieldName);
        if (index > -1) return children[index];

        return new arrow.Field(fieldName, defaultVector.type);
      });

    currentField.type.children = newChildren;

    const childrenData = batch.data.children[currentIndex].children;

    const newData = Object.entries(structFieldTypes)
      .map(([fieldName]) => {
        const index = children.findIndex(x => x.name === fieldName);
        if (index > -1) return childrenData[index];

        return defaultData;
      });

    // eslint-disable-next-line no-param-reassign
    batch.data.children[currentIndex].children = newData;

    return batch;
  }, table.batches[0]);

  return new arrow.Table(modifiedBatch);
};

const normalizeStructs = (structColumns, allArrow) => {
  const allTables = allArrow.map(modifyTableStructs(structColumns));

  return allTables;
};

const normalizeColumns = (columnDefaults, allTables) => {
  const allColumns = Object.keys(columnDefaults);

  return allTables
    .map(t => {
      const missingColumns = findMissingColumns(t, allColumns);
      if (missingColumns.length === 0) return t;

      return createColumnsWithDefaults(t, missingColumns, columnDefaults);
    })
    .map(prepOrderOfColumns(allColumns))
    .map(HACK_renameColumnsIfExist);
};

export const arrowBuffersToArquero = (dataset, arrowBuffers) => {
  console.time(`${dataset}-convert-to-arq`); // eslint-disable-line no-console
  const converted = arrowBuffers.map(row => {
    const { date } = row;

    const arq = aq.fromArrow(row.data)
      .derive({
        cache_from: `d => "${date}"`,
        date: `d => "${date}"`,
      });

    return arq;
  });
  console.timeEnd(`${dataset}-convert-to-arq`); // eslint-disable-line no-console
  return converted
    .filter(t => t.size > 0);
};

export const concatArqThroughArrow = (dataset, allTables) => {
  console.time(`${dataset}-concat`); // eslint-disable-line no-console

  const columnDefaults = getColumnDefaults(allTables);
  const normalizedTables = normalizeColumns(columnDefaults, allTables);
  const arrowTables = normalizedTables.map(t => t.toArrow({ types }));

  const structColumns = getStructColumns(arrowTables);
  const allArrow = normalizeStructs(structColumns, arrowTables);

  const [baseAsArrow, ...asArrow] = allArrow;

  const concat = baseAsArrow.concat(...asArrow);
  const arch = aq.fromArrow(concat);
  console.timeEnd(`${dataset}-concat`); // eslint-disable-line no-console

  return arch;
};
