Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix plugin pipeline race condition #1121

Merged
merged 4 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/giant-monkeys-count.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@segment/analytics-next': patch
---

Fix enrichment plugins not waiting for .load to resolve when plugin is registered manually
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
import { CorePlugin, PluginType, sleep } from '@segment/analytics-core'
import { getBufferedPageCtxFixture } from '../../test-helpers/fixtures'
import {
createMockFetchImplementation,
createRemotePlugin,
getBufferedPageCtxFixture,
} from '../../test-helpers/fixtures'
import unfetch from 'unfetch'
import { AnalyticsBrowser } from '..'
import { Analytics } from '../../core/analytics'
import { createSuccess } from '../../test-helpers/factories'
import { createDeferred } from '@segment/analytics-generic-utils'
import { PluginFactory } from '../../plugins/remote-loader'

const nextTickP = () => new Promise((r) => setTimeout(r, 0))

jest.mock('unfetch')

const mockFetchSettingsSuccessResponse = () => {
return jest
.mocked(unfetch)
.mockImplementation(() => createSuccess({ integrations: {} }))
}
beforeEach(() => {
document.head.innerHTML = `
<script id="initial"></script>`.trim()
})

describe('Lazy initialization', () => {
let trackSpy: jest.SpiedFunction<Analytics['track']>
let fetched: jest.MockedFn<typeof unfetch>
beforeEach(() => {
fetched = mockFetchSettingsSuccessResponse()
fetched = jest
.mocked(unfetch)
.mockImplementation(createMockFetchImplementation())
trackSpy = jest.spyOn(Analytics.prototype, 'track')
})

Expand Down Expand Up @@ -56,7 +60,10 @@ describe('Lazy initialization', () => {
const createTestPluginFactory = (name: string, type: PluginType) => {
const lock = createDeferred<void>()
const load = createDeferred<void>()
const trackSpy = jest.fn()
const trackSpy = jest.fn().mockImplementation((ctx) => {
ctx.event.context!.ran = true
return ctx
})

const factory: PluginFactory = () => {
return {
Expand All @@ -83,91 +90,158 @@ const createTestPluginFactory = (name: string, type: PluginType) => {

describe('Lazy destination loading', () => {
beforeEach(() => {
jest.mock('unfetch')
jest.mocked(unfetch).mockImplementation(() =>
createSuccess({
integrations: {},
jest.mocked(unfetch).mockImplementation(
createMockFetchImplementation({
integrations: {
braze: {},
google: {},
},
remotePlugins: [
{
name: 'braze',
libraryName: 'braze',
},
{
name: 'google',
libraryName: 'google',
},
createRemotePlugin('braze'),
createRemotePlugin('google'),
],
})
)
})

afterAll(() => jest.resetAllMocks())

it('loads destinations in the background', async () => {
const testEnrichmentHarness = createTestPluginFactory(
'enrichIt',
'enrichment'
)
const dest1Harness = createTestPluginFactory('braze', 'destination')
const dest2Harness = createTestPluginFactory('google', 'destination')
describe('critical plugins (plugins that block the event pipeline)', () => {
test('pipeline _will_ wait for *enrichment* plugins to load', async () => {
jest.mocked(unfetch).mockImplementation(
createMockFetchImplementation({
remotePlugins: [],
})
)
const testEnrichmentHarness = createTestPluginFactory(
'enrichIt',
'enrichment'
)

const analytics = new AnalyticsBrowser()
const analytics = new AnalyticsBrowser()

const testEnrichmentPlugin = testEnrichmentHarness.factory(
null
) as CorePlugin
const testPlugin = testEnrichmentHarness.factory(null) as CorePlugin

analytics.register(testEnrichmentPlugin).catch(() => {})
analytics.register(testPlugin).catch(() => {})
analytics.track('test event 1').catch(() => {})

await analytics.load({
writeKey: 'abc',
plugins: [dest1Harness.factory, dest2Harness.factory],
const analyticsLoaded = analytics.load({
writeKey: 'abc',
plugins: [],
})

expect(testEnrichmentHarness.trackSpy).not.toHaveBeenCalled()

// now we'll let the enrichment plugin load
testEnrichmentHarness.loadingGuard.resolve()

await analyticsLoaded
await sleep(200)
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(1)
})

test('pipeline _will_ wait for *before* plugins to load', async () => {
jest.mocked(unfetch).mockImplementation(
createMockFetchImplementation({
remotePlugins: [],
})
)
const testBeforeHarness = createTestPluginFactory('enrichIt', 'before')

const analytics = new AnalyticsBrowser()

const testPlugin = testBeforeHarness.factory(null) as CorePlugin

analytics.register(testPlugin).catch(() => {})
analytics.track('test event 1').catch(() => {})

const analyticsLoaded = analytics.load({
writeKey: 'abc',
plugins: [],
})

expect(testBeforeHarness.trackSpy).not.toHaveBeenCalled()

// now we'll let the before plugin load
testBeforeHarness.loadingGuard.resolve()

await analyticsLoaded
await sleep(200)
expect(testBeforeHarness.trackSpy).toHaveBeenCalledTimes(1)
})
})

// we won't hold enrichment plugin from loading since they are not lazy loaded
testEnrichmentHarness.loadingGuard.resolve()
// and we'll also let one destination load so we can assert some behaviours
dest1Harness.loadingGuard.resolve()
describe('non-critical plugins (plugins that do not block the event pipeline)', () => {
it('destination loading does not block the event pipeline, but enrichment plugins do', async () => {
const testEnrichmentHarness = createTestPluginFactory(
'enrichIt',
'enrichment'
)
const dest1Harness = createTestPluginFactory('braze', 'destination')
const dest2Harness = createTestPluginFactory('google', 'destination')

await testEnrichmentHarness.loadPromise
await dest1Harness.loadPromise
const analytics = new AnalyticsBrowser()

analytics.track('test event 1').catch(() => {})
const testEnrichmentPlugin = testEnrichmentHarness.factory(
null
) as CorePlugin

// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless
analytics.register(testEnrichmentPlugin).catch(() => {})

await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(1)
const p = analytics.load({
writeKey: 'abc',
plugins: [dest1Harness.factory, dest2Harness.factory],
})

await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(1)
// we won't hold enrichment plugin from loading since they are not lazy loaded
testEnrichmentHarness.loadingGuard.resolve()
await p
// and we'll also let one destination load so we can assert some behaviours
dest1Harness.loadingGuard.resolve()

// now we'll send another event
analytics.track('test event 1').catch(() => {})

analytics.track('test event 2').catch(() => {})
// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless

// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless
await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(1)

await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(2)
await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(1)
expect(dest1Harness.trackSpy.mock.calls[0][0].event.context.ran).toBe(
true
)

await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(2)
// now we'll send another event

// this whole time the other destination was not engaged with at all
expect(dest2Harness.trackSpy).not.toHaveBeenCalled()
analytics.track('test event 2').catch(() => {})

// now "after some time" the other destination will load
dest2Harness.loadingGuard.resolve()
await dest2Harness.loadPromise
// even though there's one destination that still hasn't loaded, the next assertions
// prove that the event pipeline is flowing regardless

// and now that it is "online" - the previous events that it missed will be handed over
await nextTickP()
expect(dest2Harness.trackSpy).toHaveBeenCalledTimes(2)
})
await nextTickP()
expect(testEnrichmentHarness.trackSpy).toHaveBeenCalledTimes(2)

await nextTickP()
expect(dest1Harness.trackSpy).toHaveBeenCalledTimes(2)

// this whole time the other destination was not engaged with at all
expect(dest2Harness.trackSpy).not.toHaveBeenCalled()

// now "after some time" the other destination will load
dest2Harness.loadingGuard.resolve()
await dest2Harness.loadPromise

// and now that it is "online" - the previous events that it missed will be handed over
await nextTickP()
expect(dest2Harness.trackSpy).toHaveBeenCalledTimes(2)

// should not add any other script tags
expect(document.querySelectorAll('script').length).toBe(1)
expect(document.getElementsByTagName('script')[0].id).toBe('initial')
})
})
it('emits initialize regardless of whether all destinations have loaded', async () => {
const dest1Harness = createTestPluginFactory('braze', 'destination')
const dest2Harness = createTestPluginFactory('google', 'destination')
Expand Down
2 changes: 2 additions & 0 deletions packages/browser/src/browser/__tests__/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,8 @@ describe('register', () => {
...googleAnalytics,
load: () => Promise.resolve('foo'),
})

await analytics
const goodPluginSpy = jest.spyOn(goodPlugin, 'track')

await analytics.register(goodPlugin, errorPlugin)
Expand Down
38 changes: 20 additions & 18 deletions packages/browser/src/browser/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import {
flushSetAnonymousID,
flushOn,
PreInitMethodCall,
flushRegister,
} from '../core/buffer'
import { ClassicIntegrationSource } from '../plugins/ajs-destination/types'
import { attachInspector } from '../core/inspector'
Expand Down Expand Up @@ -208,8 +209,6 @@ async function flushFinalBuffer(
// analytics calls during async function calls.
await flushAddSourceMiddleware(analytics, buffer)
flushAnalyticsCallsInNewTask(analytics, buffer)
// Clear buffer, just in case analytics is loaded twice; we don't want to fire events off again.
buffer.clear()
}

async function registerPlugins(
Expand All @@ -218,9 +217,11 @@ async function registerPlugins(
analytics: Analytics,
options: InitOptions,
pluginLikes: (Plugin | PluginFactory)[] = [],
legacyIntegrationSources: ClassicIntegrationSource[]
legacyIntegrationSources: ClassicIntegrationSource[],
preInitBuffer: PreInitMethodCallBuffer
): Promise<Context> {
const plugins = pluginLikes?.filter(
flushPreBuffer(analytics, preInitBuffer)
const pluginsFromSettings = pluginLikes?.filter(
(pluginLike) => typeof pluginLike === 'object'
) as Plugin[]

Expand Down Expand Up @@ -280,15 +281,10 @@ async function registerPlugins(
pluginSources
).catch(() => [])

const toRegister = [
envEnrichment,
...plugins,
...legacyDestinations,
...remotePlugins,
]
const basePlugins = [envEnrichment, ...legacyDestinations, ...remotePlugins]

if (schemaFilter) {
toRegister.push(schemaFilter)
basePlugins.push(schemaFilter)
}

const shouldIgnoreSegmentio =
Expand All @@ -297,7 +293,7 @@ async function registerPlugins(
(options.integrations && options.integrations['Segment.io'] === false)

if (!shouldIgnoreSegmentio) {
toRegister.push(
basePlugins.push(
await segmentio(
analytics,
mergedSettings['Segment.io'] as SegmentioSettings,
Expand All @@ -306,7 +302,15 @@ async function registerPlugins(
)
}

const ctx = await analytics.register(...toRegister)
// order is important here, (for example, if there are multiple enrichment plugins, the last registered plugin will have access to the last context.)
const ctx = await analytics.register(
// register 'core' plugins and those via destinations
...basePlugins,
// register user-defined plugins passed into AnalyticsBrowser.load({ plugins: [plugin1, plugin2] }) -- relevant to npm-only
...pluginsFromSettings
)
// register user-defined plugins registered via analytics.register()
await flushRegister(analytics, preInitBuffer)

if (
Object.entries(cdnSettings.enabledMiddleware ?? {}).some(
Expand Down Expand Up @@ -348,7 +352,7 @@ async function loadAnalytics(

if (options.initialPageview) {
// capture the page context early, so it's always up-to-date
preInitBuffer.push(new PreInitMethodCall('page', []))
preInitBuffer.add(new PreInitMethodCall('page', []))
}

let cdnSettings =
Expand Down Expand Up @@ -393,16 +397,14 @@ async function loadAnalytics(
protocol: segmentLoadOptions?.protocol,
})

// needs to be flushed before plugins are registered
flushPreBuffer(analytics, preInitBuffer)

const ctx = await registerPlugins(
settings.writeKey,
cdnSettings,
analytics,
options,
plugins,
classicIntegrations
classicIntegrations,
preInitBuffer
)

const search = window.location.search ?? ''
Expand Down
Loading
Loading