import { eventChannel } from 'redux-saga'
import PouchDB from 'pouchdb-browser'
import { put, take, call, spawn, select, takeEvery } from 'redux-saga/effects'

import { pending, error, done } from '../progress/progressActions'
import progressSaga from '../progress/progressSaga'
import { setDocument, setDocuments, removeDocument } from '../documents/documentActions'
import { getDocumentById } from '../documents/documentReducer'
import { LOGIN_RESPONSE, LOGOUT } from '../../state/auth/authActions'
import { INITIAL_SYNC, SYNCING } from './syncActions'
import { syncError } from '../../utils/error'
import config from '../../utils/config'

const createReplicatorEventChannel = (replicator) => {
  const init = ((replicator) => {
    let once = false
    const emitters = []
    return (emitter) => {
      if (!once && !emitters.includes(emitter)) {
        emitters.push(emitter)
        once = true
        emitter({ type: 'init', event: null })
      }
    }
  })()
  return eventChannel((emitter) => {
    replicator.on('denied', function (err) {
      emitter({ type: 'denied', event: err })
    }).on('error', function (err) {
      emitter({ type: 'error', event: err })
    }).on('paused', function () {
      init(emitter)
      emitter({ type: 'paused', event: null })
    }).on('complete', function (info) {
      emitter({ type: 'complete', event: info })
    }).on('active', function (direction) {
      init(emitter)
      emitter({ type: 'active', event: direction })
    }).on('change', function (change) {
      init(emitter)
      emitter({ type: 'change', event: change })
    })
    return () => { replicator.removeAllListeners() }
  })
}

const watchChannel = function* (db, channel) {
  while (true) {
    const { type, event } = yield take(channel)
    switch (type) {
      case 'denied': {
        const { reason, id, type, docError } = event.doc
        const message = `${id} ${docError}: ${reason}`
        console.error({ pouchDBResponse: event }, message)
        const doc = yield db.get(id)
        yield put(removeDocument(doc.type, doc._id))
        yield put(error(docError, ...[SYNCING, type, id]))
        break
      }
      case 'error': {
        const message = 'Error with syncing'
        console.error({ pouchDBResponse: event }, message)
        yield put(syncError(event))
        break
      }
      case 'paused': {
        yield put(done(SYNCING))
        break
      }
      case 'init': {
        yield put(pending(true, SYNCING))
        break
      }
      case 'complete': {
        yield put(done(SYNCING))
        break
      } case 'active': {
        yield put(pending(true, SYNCING))
        break
      } case 'change': {
        yield put(pending(true, SYNCING))
        const { change } = event
        if (change.ok) {
          const lastChanges = Object.values(change.docs.reduce((grouped, change) => {
            if (!grouped[change._id]) {
              grouped[change._id] = []
            }
            const group = grouped[change._id]
            group.push(change)
            return grouped
          }, {})).reduce((last, changes) => {
            const getRevNumber = (rev) => parseInt(rev.split('-')[0], 10)
            changes.sort((lhs, rhs) => getRevNumber(rhs._rev) - getRevNumber(lhs._rev))
            last.push(changes[0])
            return last
          }, [])

          for (const doc of lastChanges) {
            const currentDoc = yield select(getDocumentById(doc._id))
            if (doc && currentDoc &&
              doc._id && doc._id === currentDoc._id &&
              doc._rev && doc._rev === currentDoc._rev) {
              break
            } else if (doc.type) {
              yield put(pending(true, SYNCING, doc.type, doc._id))
              delete doc._revisions
              yield put(setDocument(doc))
              yield put(done(SYNCING, doc.type, doc._id))
            } else if (doc._deleted) {
              if (currentDoc) {
                yield put(removeDocument(currentDoc))
              }
            }
          }
        }
        break
      }
    }
  }
}

function* getLastUpdateSeq(db) {
  const info = yield call([db, db.info])
  return info.update_seq
}

function* replicateAllOnce(local, remote, options) {
  const { since, filter = () => { } } = options
  if (since) { // logged in before, means stuff already in local db...
    // Get docs once
    yield PouchDB.replicate(remote, local, {
      since,
      filter
    })
    const { rows } = yield call([local, local.allDocs], { include_docs: true })
    const docs = rows
      .map((row) => row.doc)
      .filter(filter)
    yield put(setDocuments(docs))
  } else { // first log in so no data...
    const { rows } = yield call([remote, remote.allDocs], { include_docs: true })
    const docs = rows
      .map((row) => row.doc)
      .filter(filter)
    const result = yield call([local, local.bulkDocs], docs, { new_edits: false })
    const errors = result.filter((doc) => doc.error)
    if (errors && errors.length) {
      yield put(error(errors, ...[SYNCING]))
    }
    yield put(setDocuments(docs))
  }
}

function* sync(local, remote, options) {
  return local.sync(remote, Object.assign({}, {
    live: true,
    retry: true,
    heartbeat: 10000
  }, options))
}

function* tryGetDocument(db, id) {
  let doc = null
  try {
    doc = yield call([db, db.get], id)
  } catch (err) {
    if (err.name !== 'not_found') {
      throw err
    }
  }
  return doc
}

function* startLiveReplication(local, { remoteUrl, filter, since }) {
  const remote = new PouchDB(remoteUrl, {
    skip_setup: true,
    auto_compaction: true,
    fetch: (url, opts) => {
      opts.credentials = 'include'
      return PouchDB.fetch(url, opts)
    }
  })
  yield* replicateAllOnce(local, remote, {
    since,
    filter
  })
  const { update_seq } = yield call([remote, remote.info])
  const replicator = yield* sync(local, remote, {
    since: 'now',
    filter
  })
  const channel = createReplicatorEventChannel(replicator)

  return { from: update_seq, replicator, channel }
}

export const sagas = {
  [INITIAL_SYNC]: progressSaga(() => [INITIAL_SYNC],
    function* (db) {
      const replicators = []
      const channels = []

      let loginDoc = yield call(tryGetDocument, db, config.localLoginDocId)
      if (!loginDoc) {
        loginDoc = {
          couchUrl: config.couchUrl,
          lastUpdateSeqs: {},
          db: config.db,
          _id: config.localLoginDocId
        }
      }
            
      const { from: adminFrom, replicator: adminReplicator, channel: adminChannel } = yield* startLiveReplication(db, {
        remoteUrl: `${loginDoc.couchUrl}/${loginDoc.db}`,
        filter: (doc) => !(doc._id.startsWith('org.couchdb.user') || doc._id.startsWith('_design')),
        since: loginDoc.lastUpdateSeqs[loginDoc.db]
      })      
      loginDoc.lastUpdateSeqs[loginDoc.db] = adminFrom
      replicators.push(adminReplicator)
      channels.push(adminChannel)

      const { from: usersFrom, replicator: usersReplicator, channel: usersChannel } = yield* startLiveReplication(db, {
        remoteUrl: `${loginDoc.couchUrl}/_users`,
        filter: (doc) => doc._id.startsWith('org.couchdb.user') && !doc._id.startsWith('_design'),
        since: loginDoc.lastUpdateSeqs['_users']
      })      
      loginDoc.lastUpdateSeqs['_users'] = usersFrom
      replicators.push(usersReplicator)
      channels.push(usersChannel)
     
      yield call([db, db.put], loginDoc)

      for (const channel of channels) {
        yield spawn(watchChannel, db, channel)
      }

      yield spawn(function* () {
        yield take(LOGOUT, function* () {
          yield Promise.all(replicators.map((replicator) => replicator.cancel()))
          yield Promise.all(channels.map((channel) => channel.stop()))
        })
      })
    })
}

export default function* syncSaga(db) {
  yield takeEvery(LOGIN_RESPONSE, sagas[INITIAL_SYNC], db)
}
