Skip to content

Commit

Permalink
feat(api): adds caching layer to pepr endpoints (#402)
Browse files Browse the repository at this point in the history
Co-authored-by: decleaver <85503726+decleaver@users.noreply.github.com>
  • Loading branch information
UncleGedd and decleaver authored Oct 9, 2024
1 parent e1111e9 commit 782d93b
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 96 deletions.
101 changes: 101 additions & 0 deletions pkg/api/monitor/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: 2024-Present The UDS Authors

package monitor

import (
"bytes"
"net/http"
"sync"
"time"

"github.com/defenseunicorns/uds-runtime/pkg/api/rest"
"github.com/zarf-dev/zarf/src/pkg/message"
)

// single instance of the pepr stream cache
var streamCache = NewCache()

// Cache is a simple cache for pepr stream data, it can be invalidated by a timer and max size
type Cache struct {
buffer *bytes.Buffer
lock sync.RWMutex
timer *time.Timer
maxSize int
}

// NewCache creates a new cache for pepr stream data
func NewCache() *Cache {
c := &Cache{
buffer: &bytes.Buffer{},
maxSize: 1024 * 1024 * 10, // 10MB
}
c.startResetTimer()
return c
}

// startResetTimer starts a timer that resets the cache after 5 minutes
func (c *Cache) startResetTimer() {
c.timer = time.AfterFunc(5*time.Minute, func() {
message.Debug("Pepr cache invalidated by timer, resetting cache")
c.Reset()
c.startResetTimer() // restart timer
})
}

// Reset resets the cache
func (c *Cache) Reset() {
c.lock.Lock()
defer c.lock.Unlock()
message.Debug("Resetting pepr cache")
c.buffer.Reset()
}

// Stop stops the cache timer
func (c *Cache) Stop() {
if c.timer != nil {
message.Debugf("Stopping pepr cache timer")
c.timer.Stop()
}
}

// Get returns a deep copy of cached buffer
func (c *Cache) Get() *bytes.Buffer {
c.lock.RLock()
defer c.lock.RUnlock()

if c.buffer == nil {
return nil
}

return bytes.NewBuffer(c.buffer.Bytes())
}

// Set sets the cached buffer
func (c *Cache) Set(buffer *bytes.Buffer) {
if buffer.Len() > c.maxSize {
message.Debugf("Pepr cache size %d exceeds max size %d, resetting", buffer.Len(), c.maxSize)
c.Reset()
return
}
c.lock.Lock()
defer c.lock.Unlock()
c.buffer = buffer
}

// Serve attempts to serve a cached response if available.
func (c *Cache) Serve(w http.ResponseWriter) {
cachedBuffer := c.Get()
if cachedBuffer == nil || cachedBuffer.Len() == 0 {
return
}

rest.WriteHeaders(w)
_, err := w.Write(cachedBuffer.Bytes())
if err != nil {
message.Warnf("Pepr cache failed to write response: %v", err)
return
}
w.(http.Flusher).Flush()
message.Debug("Used pepr cache to serve response")
}
24 changes: 19 additions & 5 deletions pkg/api/monitor/pepr.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func Pepr(w http.ResponseWriter, r *http.Request) {
return
}

// Only use cache for the default stream (empty streamFilter)
if streamFilter == "" {
streamCache.Serve(w)
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -51,19 +56,22 @@ func Pepr(w http.ResponseWriter, r *http.Request) {
peprStream.Follow = true
peprStream.Timestamps = true

//nolint:errcheck
// Start the stream in a goroutine
message.Debug("Starting parent pepr stream goroutine")
//nolint:errcheck
go peprStream.Start(ctx)

// Create a timer to send keep-alive messages
// The first message is sent after 2 seconds to detect empty streams
keepAliveTimer := time.NewTimer(2 * time.Second)
defer keepAliveTimer.Stop()

// Create a ticker to flush the buffer
flushTicker := time.NewTicker(time.Second)
defer flushTicker.Stop()

// create tmp cached buffer to hold stream data
tmpCacheBuffer := &bytes.Buffer{}

for {
select {
// Check if the client has disconnected
Expand All @@ -72,20 +80,26 @@ func Pepr(w http.ResponseWriter, r *http.Request) {
return

// Handle keep-alive messages
// See https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#examples
case <-keepAliveTimer.C:
// Set the keep-alive duration to 30 seconds after the first message
keepAliveTimer.Reset(30 * time.Second)

bufferWriter.KeepAlive()

// Flush every second if there is data
case <-flushTicker.C:
if bufferWriter.buffer.Len() > 0 {
// Write to both the response and the cache buffer
data := bufferWriter.buffer.Bytes()

if err := bufferWriter.Flush(w); err != nil {
message.WarnErr(err, "Failed to flush buffer")
return
}

// Update the cached buffer if on default stream
if streamFilter == "" {
tmpCacheBuffer.Write(data)
streamCache.Set(tmpCacheBuffer)
}
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions tasks/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,16 @@ tasks:
- cmd: npm run test:install # install playwright
dir: ui
- task: build:api
- task: setup:k3d
- task: deploy-load
- task: deploy-runtime-cluster
- cmd: npm run test:api-auth
dir: ui

- name: deploy-runtime-cluster
description: deploy cluster specifically for testing the app
actions:
- task: setup:k3d
- task: deploy-load

- name: e2e
description: "run end-to-end tests (assumes api server is running on port 8080)"
actions:
Expand Down
91 changes: 6 additions & 85 deletions ui/src/routes/monitor/pepr/[[stream]]/+page.svelte
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import './page.postcss'
import { getDetails } from './helpers'
import { exportPeprStream, filterEvents, handlePeprMessage, sortEvents } from './helpers'
let loaded = false
let streamFilter = ''
Expand All @@ -29,8 +29,7 @@
{ value: 'failed', label: 'Errors and Denials' },
]
const peprStream = writable<PeprEvent[]>([])
export let columns = [
let columns = [
{ name: 'event', style: 'w-2/12' },
{ name: 'resource', style: 'w-3/12' },
{ name: 'details', style: 'w-1/12' },
Expand All @@ -39,6 +38,7 @@
]
// Initialize the stores
const peprStream = writable<PeprEvent[]>([])
let search = writable<string>('')
let sortBy = writable<string>('timestamp')
let sortAsc = writable<boolean>(true)
Expand All @@ -49,39 +49,6 @@
isFiltering = !!$search
}
function filterEvents(events: PeprEvent[], searchTerm: string): PeprEvent[] {
// filter events by the search term if one exists
if (!searchTerm) return events
const searchValue = searchTerm.toLowerCase()
return events.filter(
(item) =>
item._name.toLowerCase().includes(searchValue) ||
item.event.toLowerCase().includes(searchValue) ||
item.header.toLowerCase().includes(searchValue) ||
item.msg.toLowerCase().includes(searchValue),
)
}
function sortEvents(events: PeprEvent[], sortKey: string, isAscending: boolean): PeprEvent[] {
const sortDirection = isAscending ? 1 : -1 // sort events in ascending order by default
// sort events based on the sort key
return events.sort((a, b) => {
if (sortKey === 'timestamp') {
const aTime = a.ts ? new Date(a.ts).getTime() : a.epoch
const bTime = b.ts ? new Date(b.ts).getTime() : b.epoch
return (aTime - bTime) * sortDirection
} else if (sortKey === 'count') {
const aValue = Number(a[sortKey as keyof typeof a]) || 0
const bValue = Number(b[sortKey as keyof typeof b]) || 0
return (aValue - bValue) * sortDirection
} else {
const aValue = String(a[sortKey as keyof typeof a] || '').toLowerCase()
const bValue = String(b[sortKey as keyof typeof b] || '').toLowerCase()
return aValue.localeCompare(bValue) * sortDirection
}
})
}
export const rows = derived([peprStream, search, sortBy, sortAsc], () => {
const filteredEvents = filterEvents($peprStream, $search)
return sortEvents(filteredEvents, $sortBy, $sortAsc)
Expand Down Expand Up @@ -114,60 +81,14 @@
}
eventSource.onmessage = (e) => {
try {
const payload: PeprEvent = JSON.parse(e.data)
// The event type is the first word in the header
payload.event = payload.header.split(' ')[0]
payload.details = getDetails(payload)
// If this is a repeated event, update the count
if (payload.repeated) {
// Find the first item in the peprStream that matches the header
peprStream.update((collection) => {
const idx = collection.findIndex((item) => item.header === payload.header)
if (idx !== -1) {
collection[idx].count = payload.repeated!
collection[idx].ts = payload.ts
}
return collection
})
} else {
// Otherwise, add the new event to the peprStream
peprStream.update((collection) => [payload, ...collection])
}
} catch (error) {
console.error('Error updating peprStream:', error)
}
handlePeprMessage(e, peprStream, $peprStream)
}
eventSource.onerror = (error) => {
console.error('EventSource failed:', error)
}
})
const exportPeprStream = () => {
const data = $rows.map((item) => ({
event: item.event,
resource: item._name,
count: item.count,
timestamp: item.ts,
}))
const blob = new Blob([JSON.stringify(data, null, 2)], { type: 'application/json' })
const url = URL.createObjectURL(blob)
const a = document.createElement('a')
a.href = url
a.download = `pepr-stream-${new Date().toISOString()}.json`
try {
a.click()
} finally {
setTimeout(() => {
URL.revokeObjectURL(url)
}, 100) // debounce to ensure download has started
}
}
const widths = ['w-1/6', 'w-1/3', 'w-1/4', 'w-2/5', 'w-1/2', 'w-1/5', 'w-1/3', 'w-1/4']
const skeletonRows = widths.sort(() => Math.random() - 0.5)
Expand Down Expand Up @@ -231,7 +152,7 @@
<div
class="flex flex-shrink-0 flex-col space-y-3 md:flex-row md:items-center md:space-x-3 md:space-y-0 lg:justify-end"
>
<button name="Export" type="button" on:click={exportPeprStream}>
<button name="Export" type="button" on:click={() => exportPeprStream($rows)}>
<Export class="mr-2" />
Export
</button>
Expand Down Expand Up @@ -264,7 +185,7 @@
<tbody>
{#if $rows.length === 0}
<tr>
<td colspan="4" class="text-center">No matching entries found</td>
<td colspan="5" class="text-center">No matching entries found</td>
</tr>
{:else}
{#each $rows as item}
Expand Down
Loading

0 comments on commit 782d93b

Please sign in to comment.