Skip to content

Commit

Permalink
BE: Implement audit log level (#4103)
Browse files Browse the repository at this point in the history
  • Loading branch information
iliax committed Aug 11, 2023
1 parent fa9547b commit b32ab01
Show file tree
Hide file tree
Showing 17 changed files with 213 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,13 @@ public static class AuditProperties {
Integer auditTopicsPartitions;
Boolean topicAuditEnabled;
Boolean consoleAuditEnabled;
LogLevel level;
Map<String, String> auditTopicProperties;

public enum LogLevel {
ALL,
ALTER_ONLY //default
}
}

@PostConstruct
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum AclAction implements PermissibleAction {

VIEW,
EDIT;
EDIT

;

public static final Set<AclAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static AclAction fromString(String name) {
return EnumUtils.getEnum(AclAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -10,9 +11,15 @@ public enum ApplicationConfigAction implements PermissibleAction {

;

public static final Set<ApplicationConfigAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static ApplicationConfigAction fromString(String name) {
return EnumUtils.getEnum(ApplicationConfigAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum AuditAction implements PermissibleAction {

VIEW;
VIEW

;

private static final Set<AuditAction> ALTER_ACTIONS = Set.of();

@Nullable
public static AuditAction fromString(String name) {
return EnumUtils.getEnum(AuditAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -10,9 +11,15 @@ public enum ClusterConfigAction implements PermissibleAction {

;

public static final Set<ClusterConfigAction> ALTER_ACTIONS = Set.of(EDIT);

@Nullable
public static ClusterConfigAction fromString(String name) {
return EnumUtils.getEnum(ClusterConfigAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -12,9 +13,15 @@ public enum ConnectAction implements PermissibleAction {

;

public static final Set<ConnectAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, RESTART);

@Nullable
public static ConnectAction fromString(String name) {
return EnumUtils.getEnum(ConnectAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum ConsumerGroupAction implements PermissibleAction {

VIEW,
DELETE,

RESET_OFFSETS

;

public static final Set<ConsumerGroupAction> ALTER_ACTIONS = Set.of(DELETE, RESET_OFFSETS);

@Nullable
public static ConsumerGroupAction fromString(String name) {
return EnumUtils.getEnum(ConsumerGroupAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,24 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

public enum KsqlAction implements PermissibleAction {

EXECUTE;
EXECUTE

;

public static final Set<KsqlAction> ALTER_ACTIONS = Set.of(EXECUTE);

@Nullable
public static KsqlAction fromString(String name) {
return EnumUtils.getEnum(KsqlAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,9 @@ public sealed interface PermissibleAction permits
ConsumerGroupAction, SchemaAction,
ConnectAction, ClusterConfigAction,
KsqlAction, TopicAction, AuditAction {

String name();

boolean isAlter();

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -13,9 +14,15 @@ public enum SchemaAction implements PermissibleAction {

;

public static final Set<SchemaAction> ALTER_ACTIONS = Set.of(CREATE, DELETE, EDIT, MODIFY_GLOBAL_COMPATIBILITY);

@Nullable
public static SchemaAction fromString(String name) {
return EnumUtils.getEnum(SchemaAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.model.rbac.permission;

import java.util.Set;
import org.apache.commons.lang3.EnumUtils;
import org.jetbrains.annotations.Nullable;

Expand All @@ -9,16 +10,21 @@ public enum TopicAction implements PermissibleAction {
CREATE,
EDIT,
DELETE,

MESSAGES_READ,
MESSAGES_PRODUCE,
MESSAGES_DELETE,

;

public static final Set<TopicAction> ALTER_ACTIONS = Set.of(CREATE, EDIT, DELETE, MESSAGES_PRODUCE, MESSAGES_DELETE);

@Nullable
public static TopicAction fromString(String name) {
return EnumUtils.getEnum(TopicAction.class, name);
}

@Override
public boolean isAlter() {
return ALTER_ACTIONS.contains(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.provectus.kafka.ui.exception.ValidationException;
import com.provectus.kafka.ui.model.rbac.AccessContext;
import com.provectus.kafka.ui.model.rbac.Resource;
import com.provectus.kafka.ui.model.rbac.permission.PermissibleAction;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -33,33 +34,37 @@ String toJson() {
return MAPPER.writeValueAsString(this);
}

record AuditResource(String accessType, Resource type, @Nullable Object id) {
record AuditResource(String accessType, boolean alter, Resource type, @Nullable Object id) {

private static AuditResource create(PermissibleAction action, Resource type, @Nullable Object id) {
return new AuditResource(action.name(), action.isAlter(), type, id);
}

static List<AuditResource> getAccessedResources(AccessContext ctx) {
List<AuditResource> resources = new ArrayList<>();
ctx.getClusterConfigActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CLUSTERCONFIG, null)));
.forEach(a -> resources.add(create(a, Resource.CLUSTERCONFIG, null)));
ctx.getTopicActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.TOPIC, nameId(ctx.getTopic()))));
.forEach(a -> resources.add(create(a, Resource.TOPIC, nameId(ctx.getTopic()))));
ctx.getConsumerGroupActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
.forEach(a -> resources.add(create(a, Resource.CONSUMER, nameId(ctx.getConsumerGroup()))));
ctx.getConnectActions()
.forEach(a -> {
Map<String, String> resourceId = new LinkedHashMap<>();
resourceId.put("connect", ctx.getConnect());
if (ctx.getConnector() != null) {
resourceId.put("connector", ctx.getConnector());
}
resources.add(new AuditResource(a.name(), Resource.CONNECT, resourceId));
resources.add(create(a, Resource.CONNECT, resourceId));
});
ctx.getSchemaActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.SCHEMA, nameId(ctx.getSchema()))));
.forEach(a -> resources.add(create(a, Resource.SCHEMA, nameId(ctx.getSchema()))));
ctx.getKsqlActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.KSQL, null)));
.forEach(a -> resources.add(create(a, Resource.KSQL, null)));
ctx.getAclActions()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.ACL, null)));
.forEach(a -> resources.add(create(a, Resource.ACL, null)));
ctx.getAuditAction()
.forEach(a -> resources.add(new AuditResource(a.name(), Resource.AUDIT, null)));
.forEach(a -> resources.add(create(a, Resource.AUDIT, null)));
return resources;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.provectus.kafka.ui.service.audit;

import static com.provectus.kafka.ui.config.ClustersProperties.AuditProperties.LogLevel.ALTER_ONLY;
import static com.provectus.kafka.ui.service.MessagesService.createProducer;

import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -80,12 +81,13 @@ static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
}
boolean topicAudit = Optional.ofNullable(auditProps.getTopicAuditEnabled()).orElse(false);
boolean consoleAudit = Optional.ofNullable(auditProps.getConsoleAuditEnabled()).orElse(false);
boolean alterLogOnly = Optional.ofNullable(auditProps.getLevel()).map(lvl -> lvl == ALTER_ONLY).orElse(true);
if (!topicAudit && !consoleAudit) {
return Optional.empty();
}
if (!topicAudit) {
log.info("Audit initialization finished for cluster '{}' (console only)", cluster.getName());
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
String auditTopicName = Optional.ofNullable(auditProps.getTopic()).orElse(DEFAULT_AUDIT_TOPIC_NAME);
boolean topicAuditCanBeDone = createTopicIfNeeded(cluster, acSupplier, auditTopicName, auditProps);
Expand All @@ -95,23 +97,24 @@ static Optional<AuditWriter> createAuditWriter(KafkaCluster cluster,
"Audit initialization finished for cluster '{}' (console only, topic audit init failed)",
cluster.getName()
);
return Optional.of(consoleOnlyWriter(cluster));
return Optional.of(consoleOnlyWriter(cluster, alterLogOnly));
}
return Optional.empty();
}
log.info("Audit initialization finished for cluster '{}'", cluster.getName());
return Optional.of(
new AuditWriter(
cluster.getName(),
alterLogOnly,
auditTopicName,
producerFactory.get(),
consoleAudit ? AUDIT_LOGGER : null
)
);
}

private static AuditWriter consoleOnlyWriter(KafkaCluster cluster) {
return new AuditWriter(cluster.getName(), null, null, AUDIT_LOGGER);
private static AuditWriter consoleOnlyWriter(KafkaCluster cluster, boolean alterLogOnly) {
return new AuditWriter(cluster.getName(), alterLogOnly, null, null, AUDIT_LOGGER);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

@Slf4j
record AuditWriter(String clusterName,
boolean logAlterOperationsOnly,
@Nullable String targetTopic,
@Nullable KafkaProducer<byte[], byte[]> producer,
@Nullable Logger consoleLogger) implements Closeable {
Expand All @@ -39,6 +40,10 @@ void write(AccessContext ctx, AuthenticatedUser user, @Nullable Throwable th) {
}

private void write(AuditRecord rec) {
if (logAlterOperationsOnly && rec.resources().stream().noneMatch(AuditResource::alter)) {
//we should only log alter operations, but this is read-only op
return;
}
String json = rec.toJson();
if (consoleLogger != null) {
consoleLogger.info(json);
Expand Down
Loading

0 comments on commit b32ab01

Please sign in to comment.