Skip to content

Commit

Permalink
[sync] Split sync/impl into smaller chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
radex committed Dec 17, 2018
1 parent 422b0e3 commit 6516e5b
Show file tree
Hide file tree
Showing 7 changed files with 176 additions and 142 deletions.
145 changes: 8 additions & 137 deletions src/sync/impl.js → src/sync/impl/applyRemote.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,45 +4,22 @@ import {
// $FlowFixMe
promiseAllObject,
map,
reduce,
contains,
values,
pipe,
filter,
find,
equals,
// $FlowFixMe
piped,
} from 'rambdax'
import { allPromises, unnest } from '../utils/fp'
import { logError, invariant } from '../utils/common'
import type { Database, RecordId, Collection, Model, TableName, DirtyRaw } from '..'
import * as Q from '../QueryDescription'
import { columnName } from '../Schema'
import { allPromises, unnest } from '../../utils/fp'
import { logError, invariant } from '../../utils/common'
import type { Database, RecordId, Collection, Model, TableName, DirtyRaw } from '../..'
import * as Q from '../../QueryDescription'
import { columnName } from '../../Schema'

import { prepareMarkAsSynced, prepareCreateFromRaw, prepareUpdateFromRaw } from './syncHelpers'
import type { SyncTableChangeSet, SyncDatabaseChangeSet, Timestamp } from './index'

export type SyncLocalChanges = $Exact<{ changes: SyncDatabaseChangeSet, affectedRecords: Model[] }>

const lastSyncedAtKey = '__watermelon_last_pulled_at'

export async function getLastPulledAt(database: Database): Promise<?Timestamp> {
return parseInt(await database.adapter.getLocal(lastSyncedAtKey), 10) || null
}

export async function setLastPulledAt(database: Database, timestamp: Timestamp): Promise<void> {
await database.adapter.setLocal(lastSyncedAtKey, `${timestamp}`)
}

export function ensureActionsEnabled(database: Database): void {
invariant(
database._actionsEnabled,
'[Sync] To use Sync, Actions must be enabled. Pass `{ actionsEnabled: true }` to Database constructor — see docs for more details',
)
}

// *** Applying remote changes ***
import type { SyncTableChangeSet, SyncDatabaseChangeSet } from '../index'
import { prepareCreateFromRaw, prepareUpdateFromRaw, ensureActionsEnabled } from './helpers'

const getIds = map(({ id }) => id)
const idsForChanges = ({ created, updated, deleted }: SyncTableChangeSet): RecordId[] => [
Expand Down Expand Up @@ -187,7 +164,7 @@ const prepareApplyAllRemoteChanges = (db: Database, recordsToApply: AllRecordsTo

const destroyPermanently = record => record.destroyPermanently()

export function applyRemoteChanges(
export default function applyRemoteChanges(
db: Database,
remoteChanges: SyncDatabaseChangeSet,
): Promise<void> {
Expand All @@ -203,109 +180,3 @@ export function applyRemoteChanges(
])
}, 'sync-applyRemoteChanges')
}

// *** Fetching local changes ***

const notSyncedQuery = Q.where(columnName('_status'), Q.notEq('synced'))
// TODO: It would be best to omit _status, _changed fields, since they're not necessary for the server
// but this complicates markLocalChangesAsDone, since we don't have the exact copy to compare if record changed
// TODO: It would probably also be good to only send to server locally changed fields, not full records
const rawsForStatus = (status, records) =>
reduce(
(raws, record) => (record._raw._status === status ? raws.concat({ ...record._raw }) : raws),
[],
records,
)

async function fetchLocalChangesForCollection<T: Model>(
collection: Collection<T>,
): Promise<[SyncTableChangeSet, T[]]> {
const changedRecords = await collection.query(notSyncedQuery).fetch()
const changeSet = {
created: rawsForStatus('created', changedRecords),
updated: rawsForStatus('updated', changedRecords),
deleted: await collection.database.adapter.getDeletedRecords(collection.table),
}
return [changeSet, changedRecords]
}

const extractChanges = map(([changeSet]) => changeSet)
const extractAllAffectedRecords = pipe(
values,
map(([, records]) => records),
unnest,
)

export function fetchLocalChanges(db: Database): Promise<SyncLocalChanges> {
ensureActionsEnabled(db)
return db.action(async () => {
const changes = await promiseAllObject(
map(
fetchLocalChangesForCollection,
// $FlowFixMe
db.collections.map,
),
)
// TODO: deep-freeze changes object (in dev mode only) to detect mutations (user bug)
return {
// $FlowFixMe
changes: extractChanges(changes),
affectedRecords: extractAllAffectedRecords(changes),
}
}, 'sync-fetchLocalChanges')
}

// *** Mark local changes as synced ***

const unchangedRecordsForRaws = (raws, recordCache) =>
reduce(
(records, raw) => {
const record = recordCache.find(model => model.id === raw.id)
if (!record) {
logError(
`[Sync] Looking for record ${
raw.id
} to mark it as synced, but I can't find it. Will ignore it (it should get synced next time). This is probably a Watermelon bug — please file an issue!`,
)
return records
}

// only include if it didn't change since fetch
// TODO: get rid of `equals`
return equals(record._raw, raw) ? records.concat(record) : records
},
[],
raws,
)

const recordsToMarkAsSynced = ({ changes, affectedRecords }: SyncLocalChanges): Model[] =>
pipe(
values,
map(({ created, updated }) =>
unchangedRecordsForRaws([...created, ...updated], affectedRecords),
),
unnest,
)(changes)

const destroyDeletedRecords = (db: Database, { changes }: SyncLocalChanges): Promise<*> =>
promiseAllObject(
map(
({ deleted }, tableName) => db.adapter.destroyDeletedRecords(tableName, deleted),
// $FlowFixMe
changes,
),
)

export function markLocalChangesAsSynced(
db: Database,
syncedLocalChanges: SyncLocalChanges,
): Promise<void> {
ensureActionsEnabled(db)
return db.action(async () => {
// update and destroy records concurrently
await Promise.all([
db.batch(...map(prepareMarkAsSynced, recordsToMarkAsSynced(syncedLocalChanges))),
destroyDeletedRecords(db, syncedLocalChanges),
])
}, 'sync-markLocalChangesAsSynced')
}
68 changes: 68 additions & 0 deletions src/sync/impl/fetchLocal.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// @flow

import {
// $FlowFixMe
promiseAllObject,
map,
reduce,
values,
pipe,
} from 'rambdax'
import { unnest } from '../../utils/fp'
import type { Database, Collection, Model } from '../..'
import * as Q from '../../QueryDescription'
import { columnName } from '../../Schema'

import type { SyncTableChangeSet, SyncDatabaseChangeSet } from '../index'
import { ensureActionsEnabled } from './helpers'

export type SyncLocalChanges = $Exact<{ changes: SyncDatabaseChangeSet, affectedRecords: Model[] }>

const notSyncedQuery = Q.where(columnName('_status'), Q.notEq('synced'))
// TODO: It would be best to omit _status, _changed fields, since they're not necessary for the server
// but this complicates markLocalChangesAsDone, since we don't have the exact copy to compare if record changed
// TODO: It would probably also be good to only send to server locally changed fields, not full records
const rawsForStatus = (status, records) =>
reduce(
(raws, record) => (record._raw._status === status ? raws.concat({ ...record._raw }) : raws),
[],
records,
)

async function fetchLocalChangesForCollection<T: Model>(
collection: Collection<T>,
): Promise<[SyncTableChangeSet, T[]]> {
const changedRecords = await collection.query(notSyncedQuery).fetch()
const changeSet = {
created: rawsForStatus('created', changedRecords),
updated: rawsForStatus('updated', changedRecords),
deleted: await collection.database.adapter.getDeletedRecords(collection.table),
}
return [changeSet, changedRecords]
}

const extractChanges = map(([changeSet]) => changeSet)
const extractAllAffectedRecords = pipe(
values,
map(([, records]) => records),
unnest,
)

export default function fetchLocalChanges(db: Database): Promise<SyncLocalChanges> {
ensureActionsEnabled(db)
return db.action(async () => {
const changes = await promiseAllObject(
map(
fetchLocalChangesForCollection,
// $FlowFixMe
db.collections.map,
),
)
// TODO: deep-freeze changes object (in dev mode only) to detect mutations (user bug)
return {
// $FlowFixMe
changes: extractChanges(changes),
affectedRecords: extractAllAffectedRecords(changes),
}
}, 'sync-fetchLocalChanges')
}
13 changes: 10 additions & 3 deletions src/sync/syncHelpers.js → src/sync/impl/helpers.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
// @flow

import { logError } from '../utils/common'
import { logError, invariant } from '../../utils/common'

import type { Model, Collection } from '..'
import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../RawRecord'
import type { Model, Collection, Database } from '../..'
import { type RawRecord, type DirtyRaw, sanitizedRaw } from '../../RawRecord'

// Returns raw record with naive solution to a conflict based on local `_changed` field
// This is a per-column resolution algorithm. All columns that were changed locally win
Expand Down Expand Up @@ -67,3 +67,10 @@ export function prepareMarkAsSynced<T: Model>(record: T): T {
replaceRaw(record, newRaw)
})
}

export function ensureActionsEnabled(database: Database): void {
invariant(
database._actionsEnabled,
'[Sync] To use Sync, Actions must be enabled. Pass `{ actionsEnabled: true }` to Database constructor — see docs for more details',
)
}
18 changes: 18 additions & 0 deletions src/sync/impl/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// @flow

import type { Database } from '../..'
import type { Timestamp } from '../index'

export { default as applyRemoteChanges } from './applyRemote'
export { default as fetchLocalChanges } from './fetchLocal'
export { default as markLocalChangesAsSynced } from './markAsSynced'

const lastSyncedAtKey = '__watermelon_last_pulled_at'

export async function getLastPulledAt(database: Database): Promise<?Timestamp> {
return parseInt(await database.adapter.getLocal(lastSyncedAtKey), 10) || null
}

export async function setLastPulledAt(database: Database, timestamp: Timestamp): Promise<void> {
await database.adapter.setLocal(lastSyncedAtKey, `${timestamp}`)
}
70 changes: 70 additions & 0 deletions src/sync/impl/markAsSynced.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// @flow

import {
// $FlowFixMe
promiseAllObject,
map,
reduce,
values,
pipe,
equals,
} from 'rambdax'
import { unnest } from '../../utils/fp'
import { logError } from '../../utils/common'
import type { Database, Model } from '../..'

import { prepareMarkAsSynced, ensureActionsEnabled } from './helpers'
import type { SyncLocalChanges } from './fetchLocal'

const unchangedRecordsForRaws = (raws, recordCache) =>
reduce(
(records, raw) => {
const record = recordCache.find(model => model.id === raw.id)
if (!record) {
logError(
`[Sync] Looking for record ${
raw.id
} to mark it as synced, but I can't find it. Will ignore it (it should get synced next time). This is probably a Watermelon bug — please file an issue!`,
)
return records
}

// only include if it didn't change since fetch
// TODO: get rid of `equals`
return equals(record._raw, raw) ? records.concat(record) : records
},
[],
raws,
)

const recordsToMarkAsSynced = ({ changes, affectedRecords }: SyncLocalChanges): Model[] =>
pipe(
values,
map(({ created, updated }) =>
unchangedRecordsForRaws([...created, ...updated], affectedRecords),
),
unnest,
)(changes)

const destroyDeletedRecords = (db: Database, { changes }: SyncLocalChanges): Promise<*> =>
promiseAllObject(
map(
({ deleted }, tableName) => db.adapter.destroyDeletedRecords(tableName, deleted),
// $FlowFixMe
changes,
),
)

export default function markLocalChangesAsSynced(
db: Database,
syncedLocalChanges: SyncLocalChanges,
): Promise<void> {
ensureActionsEnabled(db)
return db.action(async () => {
// update and destroy records concurrently
await Promise.all([
db.batch(...map(prepareMarkAsSynced, recordsToMarkAsSynced(syncedLocalChanges))),
destroyDeletedRecords(db, syncedLocalChanges),
])
}, 'sync-markLocalChangesAsSynced')
}
2 changes: 1 addition & 1 deletion src/sync/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import {
markLocalChangesAsSynced,
getLastPulledAt,
setLastPulledAt,
ensureActionsEnabled,
} from './impl'
import { ensureActionsEnabled } from './impl/helpers'

export type Timestamp = number

Expand Down
2 changes: 1 addition & 1 deletion src/sync/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
applyRemoteChanges,
getLastPulledAt,
} from './impl'
import { resolveConflict } from './syncHelpers'
import { resolveConflict } from './impl/helpers'

describe('Conflict resolution', () => {
it('can resolve per-column conflicts', () => {
Expand Down

0 comments on commit 6516e5b

Please sign in to comment.