-
Notifications
You must be signed in to change notification settings - Fork 390
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(queue): observability for queue #2721
base: master
Are you sure you want to change the base?
Changes from 15 commits
722fb5d
1b2fd0b
e6dfb85
a75a8dd
1b176c6
66b85c5
54a2c18
2e3fe37
c4cb517
14b2f2e
7f6c674
059d50b
f70cc8a
8f38087
379abf5
3e8e0d8
6169ea3
908ec13
2dab35f
846e3ef
fc733e9
4c199f8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,14 @@ | ||
import { EventEmitter } from 'events'; | ||
import { QueueBaseOptions, RedisClient } from '../interfaces'; | ||
import { | ||
QueueBaseOptions, | ||
RedisClient, | ||
Span, | ||
Tracer, | ||
SetSpan, | ||
ContextManager, | ||
SpanKind, | ||
Propagation, | ||
} from '../interfaces'; | ||
import { MinimalQueue } from '../types'; | ||
import { | ||
delay, | ||
|
@@ -11,6 +20,7 @@ import { RedisConnection } from './redis-connection'; | |
import { Job } from './job'; | ||
import { KeysMap, QueueKeys } from './queue-keys'; | ||
import { Scripts } from './scripts'; | ||
import { TelemetryAttributes } from '../enums'; | ||
|
||
/** | ||
* @class QueueBase | ||
|
@@ -30,6 +40,16 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
protected connection: RedisConnection; | ||
public readonly qualifiedName: string; | ||
|
||
/** | ||
* Instance of a telemetry client | ||
* To use it create if statement in a method to observe with start and end of a span | ||
* It will check if tracer is provided and if not it will continue as is | ||
*/ | ||
private tracer: Tracer | undefined; | ||
private setSpan: SetSpan | undefined; | ||
protected contextManager: ContextManager | undefined; | ||
protected propagation: Propagation | undefined; | ||
|
||
/** | ||
* | ||
* @param name - The name of the queue. | ||
|
@@ -76,6 +96,13 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
this.keys = queueKeys.getKeys(name); | ||
this.toKey = (type: string) => queueKeys.toKey(name, type); | ||
this.setScripts(); | ||
|
||
if (opts?.telemetry) { | ||
this.tracer = opts.telemetry.trace.getTracer(opts.telemetry.tracerName); | ||
manast marked this conversation as resolved.
Show resolved
Hide resolved
|
||
this.setSpan = opts.telemetry.trace.setSpan; | ||
this.contextManager = opts.telemetry.contextManager; | ||
this.propagation = opts.telemetry.propagation; | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -175,4 +202,59 @@ export class QueueBase extends EventEmitter implements MinimalQueue { | |
} | ||
} | ||
} | ||
|
||
/** | ||
* Wraps the code with telemetry and provides span for configuration. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "... provides a span for configuration." |
||
* | ||
* @param spanType - type of the span: Producer, Consumer, Internal | ||
* @param getSpanName - name of the span | ||
* @param callback - code to wrap with telemetry | ||
* @returns | ||
*/ | ||
protected trace<T>( | ||
getSpanType: () => SpanKind, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We do not need to have a function for getting the span type here, the reason for having it for the span name is because as the span name usually requires some computation (like concatenating strings, etc), by having it as a callback it does not perform those computations if telemetry is not enabled. For the spanType we are just passing a const, so this will not impact performance in any way. |
||
getSpanName: () => string, | ||
callback: ( | ||
span?: Span, | ||
telemetryHeaders?: Record<string, string>, | ||
) => Promise<T> | T, | ||
activeTelemetryHeaders?: Record<string, string>, | ||
) { | ||
if (!this.tracer) { | ||
return callback(); | ||
} | ||
|
||
const span = this.tracer.startSpan(getSpanName(), { | ||
kind: getSpanType(), | ||
}); | ||
|
||
span.setAttributes({ | ||
[TelemetryAttributes.QueueName]: this.name, | ||
}); | ||
|
||
try { | ||
if (activeTelemetryHeaders) { | ||
const activeContext = this.propagation.extract( | ||
this.contextManager.active(), | ||
activeTelemetryHeaders, | ||
); | ||
|
||
return this.contextManager.with(activeContext, () => callback(span)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume contextManager.with will (for the Otel case), create a parent-child relationship for the spans? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, in this case we need to call it like awaiting like this: return await this.contextManager.with(activeContext, () => callback(span)); The reason being that we want to catch potential exceptions thrown by the callback, without this await the exception will just bubble up and we loose the ability to record the exception in the tracer. |
||
} | ||
|
||
const telemetryHeaders: Record<string, string> = {}; | ||
|
||
this.propagation.inject(this.contextManager.active(), telemetryHeaders); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so we are going to always inject the headers, hmm, we may need to give it a bit more of thought, not sure this is 100% correct. |
||
|
||
return this.contextManager.with( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same issue here we need to await before returning. |
||
this.setSpan(this.contextManager.active(), span), | ||
() => callback(span, telemetryHeaders), | ||
); | ||
} catch (err) { | ||
span.recordException(err as Error); | ||
throw err; | ||
} finally { | ||
span.end(); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not think this comment is correct anymore as we are using the trace helper.