From 782d93b9b030a0a16191de58ec7c6920982f248a Mon Sep 17 00:00:00 2001 From: UncleGedd <42304551+UncleGedd@users.noreply.github.com> Date: Wed, 9 Oct 2024 18:21:07 -0500 Subject: [PATCH] feat(api): adds caching layer to pepr endpoints (#402) Co-authored-by: decleaver <85503726+decleaver@users.noreply.github.com> --- pkg/api/monitor/cache.go | 101 ++++++++++++++++++ pkg/api/monitor/pepr.go | 24 ++++- tasks/test.yaml | 9 +- .../monitor/pepr/[[stream]]/+page.svelte | 91 ++-------------- .../routes/monitor/pepr/[[stream]]/helpers.ts | 97 ++++++++++++++++- ui/tests/monitor.spec.ts | 33 +++++- 6 files changed, 259 insertions(+), 96 deletions(-) create mode 100644 pkg/api/monitor/cache.go diff --git a/pkg/api/monitor/cache.go b/pkg/api/monitor/cache.go new file mode 100644 index 00000000..f0ab4d9f --- /dev/null +++ b/pkg/api/monitor/cache.go @@ -0,0 +1,101 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: 2024-Present The UDS Authors + +package monitor + +import ( + "bytes" + "net/http" + "sync" + "time" + + "github.com/defenseunicorns/uds-runtime/pkg/api/rest" + "github.com/zarf-dev/zarf/src/pkg/message" +) + +// single instance of the pepr stream cache +var streamCache = NewCache() + +// Cache is a simple cache for pepr stream data, it can be invalidated by a timer and max size +type Cache struct { + buffer *bytes.Buffer + lock sync.RWMutex + timer *time.Timer + maxSize int +} + +// NewCache creates a new cache for pepr stream data +func NewCache() *Cache { + c := &Cache{ + buffer: &bytes.Buffer{}, + maxSize: 1024 * 1024 * 10, // 10MB + } + c.startResetTimer() + return c +} + +// startResetTimer starts a timer that resets the cache after 5 minutes +func (c *Cache) startResetTimer() { + c.timer = time.AfterFunc(5*time.Minute, func() { + message.Debug("Pepr cache invalidated by timer, resetting cache") + c.Reset() + c.startResetTimer() // restart timer + }) +} + +// Reset resets the cache +func (c *Cache) Reset() { + c.lock.Lock() + defer c.lock.Unlock() + message.Debug("Resetting pepr cache") + c.buffer.Reset() +} + +// Stop stops the cache timer +func (c *Cache) Stop() { + if c.timer != nil { + message.Debugf("Stopping pepr cache timer") + c.timer.Stop() + } +} + +// Get returns a deep copy of cached buffer +func (c *Cache) Get() *bytes.Buffer { + c.lock.RLock() + defer c.lock.RUnlock() + + if c.buffer == nil { + return nil + } + + return bytes.NewBuffer(c.buffer.Bytes()) +} + +// Set sets the cached buffer +func (c *Cache) Set(buffer *bytes.Buffer) { + if buffer.Len() > c.maxSize { + message.Debugf("Pepr cache size %d exceeds max size %d, resetting", buffer.Len(), c.maxSize) + c.Reset() + return + } + c.lock.Lock() + defer c.lock.Unlock() + c.buffer = buffer +} + +// Serve attempts to serve a cached response if available. +func (c *Cache) Serve(w http.ResponseWriter) { + cachedBuffer := c.Get() + if cachedBuffer == nil || cachedBuffer.Len() == 0 { + return + } + + rest.WriteHeaders(w) + _, err := w.Write(cachedBuffer.Bytes()) + if err != nil { + message.Warnf("Pepr cache failed to write response: %v", err) + return + } + w.(http.Flusher).Flush() + message.Debug("Used pepr cache to serve response") +} diff --git a/pkg/api/monitor/pepr.go b/pkg/api/monitor/pepr.go index cd70601e..8f2fa484 100644 --- a/pkg/api/monitor/pepr.go +++ b/pkg/api/monitor/pepr.go @@ -33,6 +33,11 @@ func Pepr(w http.ResponseWriter, r *http.Request) { return } + // Only use cache for the default stream (empty streamFilter) + if streamFilter == "" { + streamCache.Serve(w) + } + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -51,12 +56,12 @@ func Pepr(w http.ResponseWriter, r *http.Request) { peprStream.Follow = true peprStream.Timestamps = true - //nolint:errcheck // Start the stream in a goroutine + message.Debug("Starting parent pepr stream goroutine") + //nolint:errcheck go peprStream.Start(ctx) // Create a timer to send keep-alive messages - // The first message is sent after 2 seconds to detect empty streams keepAliveTimer := time.NewTimer(2 * time.Second) defer keepAliveTimer.Stop() @@ -64,6 +69,9 @@ func Pepr(w http.ResponseWriter, r *http.Request) { flushTicker := time.NewTicker(time.Second) defer flushTicker.Stop() + // create tmp cached buffer to hold stream data + tmpCacheBuffer := &bytes.Buffer{} + for { select { // Check if the client has disconnected @@ -72,20 +80,26 @@ func Pepr(w http.ResponseWriter, r *http.Request) { return // Handle keep-alive messages - // See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#examples case <-keepAliveTimer.C: - // Set the keep-alive duration to 30 seconds after the first message keepAliveTimer.Reset(30 * time.Second) - bufferWriter.KeepAlive() // Flush every second if there is data case <-flushTicker.C: if bufferWriter.buffer.Len() > 0 { + // Write to both the response and the cache buffer + data := bufferWriter.buffer.Bytes() + if err := bufferWriter.Flush(w); err != nil { message.WarnErr(err, "Failed to flush buffer") return } + + // Update the cached buffer if on default stream + if streamFilter == "" { + tmpCacheBuffer.Write(data) + streamCache.Set(tmpCacheBuffer) + } } } } diff --git a/tasks/test.yaml b/tasks/test.yaml index 32b6b546..e5c0981d 100644 --- a/tasks/test.yaml +++ b/tasks/test.yaml @@ -10,11 +10,16 @@ tasks: - cmd: npm run test:install # install playwright dir: ui - task: build:api - - task: setup:k3d - - task: deploy-load + - task: deploy-runtime-cluster - cmd: npm run test:api-auth dir: ui + - name: deploy-runtime-cluster + description: deploy cluster specifically for testing the app + actions: + - task: setup:k3d + - task: deploy-load + - name: e2e description: "run end-to-end tests (assumes api server is running on port 8080)" actions: diff --git a/ui/src/routes/monitor/pepr/[[stream]]/+page.svelte b/ui/src/routes/monitor/pepr/[[stream]]/+page.svelte index 0a5747cd..09e82b09 100644 --- a/ui/src/routes/monitor/pepr/[[stream]]/+page.svelte +++ b/ui/src/routes/monitor/pepr/[[stream]]/+page.svelte @@ -12,7 +12,7 @@ import './page.postcss' - import { getDetails } from './helpers' + import { exportPeprStream, filterEvents, handlePeprMessage, sortEvents } from './helpers' let loaded = false let streamFilter = '' @@ -29,8 +29,7 @@ { value: 'failed', label: 'Errors and Denials' }, ] - const peprStream = writable([]) - export let columns = [ + let columns = [ { name: 'event', style: 'w-2/12' }, { name: 'resource', style: 'w-3/12' }, { name: 'details', style: 'w-1/12' }, @@ -39,6 +38,7 @@ ] // Initialize the stores + const peprStream = writable([]) let search = writable('') let sortBy = writable('timestamp') let sortAsc = writable(true) @@ -49,39 +49,6 @@ isFiltering = !!$search } - function filterEvents(events: PeprEvent[], searchTerm: string): PeprEvent[] { - // filter events by the search term if one exists - if (!searchTerm) return events - const searchValue = searchTerm.toLowerCase() - return events.filter( - (item) => - item._name.toLowerCase().includes(searchValue) || - item.event.toLowerCase().includes(searchValue) || - item.header.toLowerCase().includes(searchValue) || - item.msg.toLowerCase().includes(searchValue), - ) - } - - function sortEvents(events: PeprEvent[], sortKey: string, isAscending: boolean): PeprEvent[] { - const sortDirection = isAscending ? 1 : -1 // sort events in ascending order by default - // sort events based on the sort key - return events.sort((a, b) => { - if (sortKey === 'timestamp') { - const aTime = a.ts ? new Date(a.ts).getTime() : a.epoch - const bTime = b.ts ? new Date(b.ts).getTime() : b.epoch - return (aTime - bTime) * sortDirection - } else if (sortKey === 'count') { - const aValue = Number(a[sortKey as keyof typeof a]) || 0 - const bValue = Number(b[sortKey as keyof typeof b]) || 0 - return (aValue - bValue) * sortDirection - } else { - const aValue = String(a[sortKey as keyof typeof a] || '').toLowerCase() - const bValue = String(b[sortKey as keyof typeof b] || '').toLowerCase() - return aValue.localeCompare(bValue) * sortDirection - } - }) - } - export const rows = derived([peprStream, search, sortBy, sortAsc], () => { const filteredEvents = filterEvents($peprStream, $search) return sortEvents(filteredEvents, $sortBy, $sortAsc) @@ -114,30 +81,7 @@ } eventSource.onmessage = (e) => { - try { - const payload: PeprEvent = JSON.parse(e.data) - // The event type is the first word in the header - payload.event = payload.header.split(' ')[0] - payload.details = getDetails(payload) - - // If this is a repeated event, update the count - if (payload.repeated) { - // Find the first item in the peprStream that matches the header - peprStream.update((collection) => { - const idx = collection.findIndex((item) => item.header === payload.header) - if (idx !== -1) { - collection[idx].count = payload.repeated! - collection[idx].ts = payload.ts - } - return collection - }) - } else { - // Otherwise, add the new event to the peprStream - peprStream.update((collection) => [payload, ...collection]) - } - } catch (error) { - console.error('Error updating peprStream:', error) - } + handlePeprMessage(e, peprStream, $peprStream) } eventSource.onerror = (error) => { @@ -145,29 +89,6 @@ } }) - const exportPeprStream = () => { - const data = $rows.map((item) => ({ - event: item.event, - resource: item._name, - count: item.count, - timestamp: item.ts, - })) - - const blob = new Blob([JSON.stringify(data, null, 2)], { type: 'application/json' }) - const url = URL.createObjectURL(blob) - const a = document.createElement('a') - a.href = url - a.download = `pepr-stream-${new Date().toISOString()}.json` - - try { - a.click() - } finally { - setTimeout(() => { - URL.revokeObjectURL(url) - }, 100) // debounce to ensure download has started - } - } - const widths = ['w-1/6', 'w-1/3', 'w-1/4', 'w-2/5', 'w-1/2', 'w-1/5', 'w-1/3', 'w-1/4'] const skeletonRows = widths.sort(() => Math.random() - 0.5) @@ -231,7 +152,7 @@
- @@ -264,7 +185,7 @@ {#if $rows.length === 0} - No matching entries found + No matching entries found {:else} {#each $rows as item} diff --git a/ui/src/routes/monitor/pepr/[[stream]]/helpers.ts b/ui/src/routes/monitor/pepr/[[stream]]/helpers.ts index 04f2df2d..90b1b43b 100644 --- a/ui/src/routes/monitor/pepr/[[stream]]/helpers.ts +++ b/ui/src/routes/monitor/pepr/[[stream]]/helpers.ts @@ -1,4 +1,5 @@ import type { SvelteComponent } from 'svelte' +import type { Writable } from 'svelte/store' import type { PatchOperation, PeprDetails, PeprEvent } from '$lib/types' @@ -15,7 +16,7 @@ function decodeBase64(base64String: string) { } } -export function getDetails(payload: PeprEvent): PeprDetails | undefined { +function getDetails(payload: PeprEvent): PeprDetails | undefined { if (!payload.res) { return undefined } @@ -62,3 +63,97 @@ export function getDetails(payload: PeprEvent): PeprDetails | undefined { return undefined } + +export function filterEvents(events: PeprEvent[], searchTerm: string): PeprEvent[] { + // filter events by the search term if one exists + if (!searchTerm) return events + const searchValue = searchTerm.toLowerCase() + return events.filter( + (item) => + item._name.toLowerCase().includes(searchValue) || + item.event.toLowerCase().includes(searchValue) || + item.header.toLowerCase().includes(searchValue) || + item.msg.toLowerCase().includes(searchValue), + ) +} + +export function sortEvents(events: PeprEvent[], sortKey: string, isAscending: boolean): PeprEvent[] { + const sortDirection = isAscending ? 1 : -1 // sort events in ascending order by default + // sort events based on the sort key + return events.sort((a, b) => { + if (sortKey === 'timestamp') { + const aTime = a.ts ? new Date(a.ts).getTime() : a.epoch + const bTime = b.ts ? new Date(b.ts).getTime() : b.epoch + return (aTime - bTime) * sortDirection * -1 // latest events on top? + } else if (sortKey === 'count') { + const aValue = Number(a[sortKey as keyof typeof a]) || 0 + const bValue = Number(b[sortKey as keyof typeof b]) || 0 + return (aValue - bValue) * sortDirection + } else { + const aValue = String(a[sortKey as keyof typeof a] || '').toLowerCase() + const bValue = String(b[sortKey as keyof typeof b] || '').toLowerCase() + return aValue.localeCompare(bValue) * sortDirection + } + }) +} + +export const exportPeprStream = (rows: PeprEvent[]) => { + const data = rows.map((item) => ({ + event: item.event, + resource: item._name, + count: item.count, + timestamp: item.ts, + })) + + const blob = new Blob([JSON.stringify(data, null, 2)], { type: 'application/json' }) + const url = URL.createObjectURL(blob) + const a = document.createElement('a') + a.href = url + a.download = `pepr-stream-${new Date().toISOString()}.json` + + try { + a.click() + } finally { + setTimeout(() => { + URL.revokeObjectURL(url) + }, 100) // debounce to ensure download has started + } +} + +export function handlePeprMessage(e: MessageEvent, peprStreamStore: Writable, peprStream: PeprEvent[]) { + try { + const payload: PeprEvent = JSON.parse(e.data) + // The event type is the first word in the header + payload.event = payload.header.split(' ')[0] + payload.details = getDetails(payload) + + // handle "repeated"-type payloads + if (payload.repeated) { + // these events don't have a _name attribute so we just update the pepr stream row directly + const idx = peprStream.findIndex((item) => item.header === payload.header) + if (idx !== -1) { + peprStreamStore.update((collection) => { + collection[idx].count = payload.repeated! + collection[idx].ts = payload.ts + return collection + }) + } + return // "repeated"-type payload handled, no need to continue + } + + // check existing rows for duplicates + const dupIdx = peprStream.findIndex((item) => item.header === payload.header && item.ts === payload.ts) + if (dupIdx !== -1) { + // remove duplicate from the stream and update with the latest payload + peprStreamStore.update((collection) => { + collection.splice(dupIdx, 1) + return [payload, ...collection] + }) + return + } + // payload isn't a dup, add to stream + peprStreamStore.update((collection) => [payload, ...collection]) + } catch (error) { + console.error('Error updating peprStream:', error) + } +} diff --git a/ui/tests/monitor.spec.ts b/ui/tests/monitor.spec.ts index 23fd5887..3d6a3622 100644 --- a/ui/tests/monitor.spec.ts +++ b/ui/tests/monitor.spec.ts @@ -5,12 +5,39 @@ import * as fs from 'node:fs' import { expect, test } from '@playwright/test' -test.describe('Monitor', async () => { +// Run monitor tests in serial to ensure we capture the un-cached response time in the first test +test.describe.serial('Monitor', async () => { + // used to store the initial load time of the page before the cache is used + let initialLoadTime: number + test.beforeEach(async ({ page }) => { + const startTime = new Date().getTime() await page.goto('/monitor/pepr') // wait for data to load await page.waitForSelector('.pepr-event.ALLOWED') + + const endTime = new Date().getTime() + + // initial load time is the time to load the data when the cache isn't used (ie. the first visit to the page) + initialLoadTime = endTime - startTime + }) + + test('Pepr cache', async ({ page }) => { + // reload the page + await page.reload() + + // wait for data to load + const startTime = new Date().getTime() + await page.waitForSelector('.pepr-event.ALLOWED') + const endTime = new Date().getTime() + const cachedLoadTime = endTime - startTime + + // adding debug logs to help diagnose test failures + console.debug('Initial load time:', initialLoadTime) + console.debug('Cached load time:', cachedLoadTime) + + expect(cachedLoadTime).toBeLessThan(initialLoadTime) }) test('searching with "All Pepr Events selected', async ({ page }) => { @@ -58,8 +85,8 @@ test.describe('Monitor', async () => { expect(match).toBeTruthy() const newCount = parseInt(match![1]) - // sorted 'count' value should be greater than the original - expect(newCount).toBeGreaterThan(originalCount) + // sorted 'count' value should be greater than or equal to the original + expect(newCount).toBeGreaterThanOrEqual(originalCount) }) test('Exports logs', async ({ page }) => {