Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Adding authorization capabilities to DoctorKafka (#143)
Browse files Browse the repository at this point in the history
  • Loading branch information
ambud authored and yuyang08 committed May 21, 2019
1 parent 3d4beb1 commit e8af094
Show file tree
Hide file tree
Showing 11 changed files with 237 additions and 8 deletions.
23 changes: 22 additions & 1 deletion docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,25 @@ DELETE will remove the cluster from maintenance mode.
curl -XGET http://localhost:8080/api/cluster/<clustername>/admin/maintenance
curl -XPUT http://localhost:8080/api/cluster/<clustername>/admin/maintenance
curl -XDELETE http://localhost:8080/api/cluster/<clustername>/admin/maintenance
```
```

**API Security**

Dr. Kafka allows plugable API request authorization and follows the Role Based Access Control (RBAC) model. Authorization is performed by populating role-mapping in [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java) by creating an implementation of AuthorizationFilter e.g. [SampleAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/SampleAuthorizationFilter.java)

Here's the flow sequence:
1. DoctorKafkaMain checks if an authorization filter has been specified via `doctorkafka.authorization.filter.class` configuration and creates an instance of `DrKafkaAuthorizationFilter`
2. This instance is then configured (invoke `configure(DoctorKafkaConfig config)`) and registered with Jersey

All authorization filters must implement [DrKafkaAuthorizationFilter](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaAuthorizationFilter.java) which has two methods that need to be implemented:

- `configure(DoctorKafkaConfig config)`
- `filter(ContainerRequestContext requestContext)`

`configure(DoctorKafkaConfig config)` provides DoctorKafkaConfig to allow authorizer to configure, `DoctorKafkaConfig.getDrKafkaAdminGroups()` returns the list of groups that need to be mapped to `drkafka_admin` role

`filter(ContainerRequestContext requestContext)` should implement the logic to extract and populate PRINCIPAL & ROLE information which is needed to create a new instance of [DrKafkaSecurityContext](https://github.com/pinterest/doctorkafka/tree/master/drkafka/src/main/java/com/pinterest/doctorkafka/security/DrKafkaSecurityContext.java). Jersey then uses this information to restricted access to methods for users who are not in the `drkafka_admin` role. Here's the flow:

(Authentication) -> (Populates user & group info headers) -> (YourDrKafkaAuthoriziationFilter) -> (extract User and Group info) -> (Map groups to roles) -> (Create SecurityContext) -> (Inject SecurityContext back in session)

Note: We currently don't ship authentication mechanisms with Dr.Kafka since authentication requirements are environment/company specific. For plugable authentication, please refer to https://www.dropwizard.io/1.3.8/docs/manual/auth.html You may also use an authentication proxy.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.glassfish.jersey.server.filter.RolesAllowedDynamicFeature;

import com.google.common.collect.ImmutableList;

Expand All @@ -16,6 +17,7 @@
import com.pinterest.doctorkafka.config.DoctorKafkaAppConfig;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter;
import com.pinterest.doctorkafka.servlet.ClusterInfoServlet;
import com.pinterest.doctorkafka.servlet.DoctorKafkaActionsServlet;
import com.pinterest.doctorkafka.servlet.DoctorKafkaBrokerStatsServlet;
Expand Down Expand Up @@ -68,7 +70,7 @@ public void run(DoctorKafkaAppConfig configuration, Environment environment) thr

doctorKafka = new DoctorKafka(replicaStatsManager);

registerAPIs(environment, doctorKafka);
registerAPIs(environment, doctorKafka, replicaStatsManager.getConfig());
registerServlets(environment);

Executors.newCachedThreadPool().submit(() -> {
Expand Down Expand Up @@ -115,14 +117,31 @@ private void configureServerRuntime(DoctorKafkaAppConfig configuration, DoctorKa
defaultServerFactory.setApplicationConnectors(Collections.singletonList(application));
}

private void registerAPIs(Environment environment, DoctorKafka doctorKafka) {
private void registerAPIs(Environment environment, DoctorKafka doctorKafka, DoctorKafkaConfig doctorKafkaConfig) {
environment.jersey().setUrlPattern("/api/*");
checkAndInitializeAuthorizationFilter(environment, doctorKafkaConfig);
environment.jersey().register(new BrokersApi(doctorKafka));
environment.jersey().register(new ClustersApi(doctorKafka));
environment.jersey().register(new ClustersMaintenanceApi(doctorKafka));
environment.jersey().register(new BrokersDecommissionApi(doctorKafka));
}

private void checkAndInitializeAuthorizationFilter(Environment environment, DoctorKafkaConfig doctorKafkaConfig) {
LOG.info("Checking authorization filter");
try {
Class<? extends DrKafkaAuthorizationFilter> authorizationFilterClass = doctorKafkaConfig.getAuthorizationFilterClass();
if (authorizationFilterClass != null) {
DrKafkaAuthorizationFilter filter = authorizationFilterClass.newInstance();
filter.configure(doctorKafkaConfig);
LOG.info("Using authorization filer:" + filter.getClass().getName());
environment.jersey().register(filter);
environment.jersey().register(RolesAllowedDynamicFeature.class);
}
} catch (Exception e) {
LOG.error("Failed to get and initialize DrKafkaAuthorizationFilter", e);
}
}

private void startMetricsService() {
int ostrichPort = replicaStatsManager.getConfig().getOstrichPort();
String tsdHostPort = replicaStatsManager.getConfig().getTsdHostPort();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.pinterest.doctorkafka.api;

import java.util.List;
import java.util.stream.Collectors;

import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
Expand All @@ -11,7 +10,6 @@
import javax.ws.rs.core.MediaType;

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.DoctorKafkaMain;
import com.pinterest.doctorkafka.KafkaBroker;
import com.pinterest.doctorkafka.KafkaClusterManager;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.pinterest.doctorkafka.api;

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.ApiUtils;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import javax.annotation.security.RolesAllowed;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand Down Expand Up @@ -34,6 +36,7 @@ public boolean isBrokerDecommissioned(@PathParam("clusterName") String clusterNa
}

@PUT
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void decommissionBroker(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName,
@PathParam("brokerId") String brokerIdStr) {
Expand All @@ -42,6 +45,7 @@ public void decommissionBroker(@Context HttpServletRequest ctx,
}

@DELETE
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void cancelDecommissionBroker(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName,
@PathParam("brokerId") String brokerIdStr) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.pinterest.doctorkafka.api;

import javax.annotation.security.RolesAllowed;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
Expand All @@ -16,6 +17,7 @@

import com.pinterest.doctorkafka.DoctorKafka;
import com.pinterest.doctorkafka.KafkaClusterManager;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.util.ApiUtils;

@Path("/clusters/{clusterName}/admin/maintenance")
Expand All @@ -36,6 +38,7 @@ public boolean checkMaintenance(@PathParam("clusterName") String clusterName) {
}

@PUT
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void enableMaintenance(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName) {
KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName);
Expand All @@ -44,6 +47,7 @@ public void enableMaintenance(@Context HttpServletRequest ctx,
}

@DELETE
@RolesAllowed({ DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE })
public void disableMaintenance(@Context HttpServletRequest ctx,
@PathParam("clusterName") String clusterName) {
KafkaClusterManager clusterManager = checkAndGetClusterManager(clusterName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,4 @@ protected KafkaBroker checkAndGetBroker(String clusterName, String brokerId) {
return broker;
}



}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.pinterest.doctorkafka.security.DrKafkaAuthorizationFilter;

import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -43,6 +47,9 @@ public class DoctorKafkaConfig {
private static final String NOTIFICATION_EMAILS = "emails.notification";
private static final String ALERT_EMAILS = "emails.alert";
private static final String WEB_BIND_HOST = "web.bindhost";
public static final String DRKAFKA_ADMIN_ROLE = "drkafka_admin";
private static final String DRKAFKA_ADMIN_GROUPS = "admin.groups";
private static final String AUTHORIZATION_FILTER_CLASS = "authorization.filter.class";

private PropertiesConfiguration configuration = null;
private AbstractConfiguration drkafkaConfiguration = null;
Expand Down Expand Up @@ -218,4 +225,32 @@ public String[] getAlertEmails() {
public boolean getRestartDisabled(){
return drkafkaConfiguration.getBoolean(RESTART_DISABLE, false);
}

/**
* Return authorization filter class (if any)
* @return authorization filter class
* @throws ClassNotFoundException
*/
@SuppressWarnings("unchecked")
public Class<? extends DrKafkaAuthorizationFilter> getAuthorizationFilterClass() throws ClassNotFoundException {
if (drkafkaConfiguration.containsKey(AUTHORIZATION_FILTER_CLASS)) {
String classFqcn = drkafkaConfiguration.getString(AUTHORIZATION_FILTER_CLASS);
return (Class<? extends DrKafkaAuthorizationFilter>) Class.forName(classFqcn);
} else {
return null;
}
}

/**
* Groups from directory service (like LDAP) that are granted Dr.Kafka Admin
* permissions to run privileged commands.
* @return list of groups
*/
public List<String> getDrKafkaAdminGroups() {
if (drkafkaConfiguration.containsKey(DRKAFKA_ADMIN_GROUPS)) {
return Arrays.asList(drkafkaConfiguration.getStringArray(DRKAFKA_ADMIN_GROUPS));
} else {
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.pinterest.doctorkafka.security;

import javax.ws.rs.container.ContainerRequestFilter;

import com.pinterest.doctorkafka.config.DoctorKafkaConfig;

/**
* This extends JAX-RS containter request filter for authorization.
*
* Please refer to https://docs.oracle.com/javaee/7/api/javax/ws/rs/container/ContainerRequestFilter.html
* for more details on how {@link ContainerRequestFilter} works
*/
public interface DrKafkaAuthorizationFilter extends ContainerRequestFilter {

public void configure(DoctorKafkaConfig config) throws Exception;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.pinterest.doctorkafka.security;

import java.security.Principal;
import java.util.Set;

import javax.ws.rs.core.SecurityContext;

public class DrKafkaSecurityContext implements SecurityContext {

private static final String DR_KAFKA_AUTH = "drkauth";
private UserPrincipal principal;
private Set<String> roles;

public DrKafkaSecurityContext(UserPrincipal principal, Set<String> roles) {
this.principal = principal;
this.roles = roles;
}

@Override
public Principal getUserPrincipal() {
return principal;
}

@Override
public boolean isUserInRole(String role) {
return roles.contains(role);
}

@Override
public boolean isSecure() {
return true;
}

@Override
public String getAuthenticationScheme() {
return DR_KAFKA_AUTH;
}

@Override
public String toString() {
return "DrKafkaSecurityContext [principal=" + principal + ", roles=" + roles + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.pinterest.doctorkafka.security;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import javax.annotation.Priority;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.ext.Provider;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.pinterest.doctorkafka.config.DoctorKafkaConfig;

import jersey.repackaged.com.google.common.collect.Sets;
import jersey.repackaged.com.google.common.collect.Sets.SetView;

/**
* This is a sample implementation of {@link DrKafkaAuthorizationFilter}
*/
@Provider
@Priority(1000)
public class SampleAuthorizationFilter implements DrKafkaAuthorizationFilter {

private static final Logger LOG = LogManager.getLogger(SampleAuthorizationFilter.class);
private static final String GROUPS_HEADER = "GROUPS";
private static final String USER_HEADER = "USER";
private Set<String> allowedAdminGroups = new HashSet<>();
private static final Set<String> ADMIN_ROLE_SET = new HashSet<>(
Arrays.asList(DoctorKafkaConfig.DRKAFKA_ADMIN_ROLE));
private static final Set<String> EMPTY_ROLE_SET = new HashSet<>();

@Override
public void configure(DoctorKafkaConfig config) throws Exception {
List<String> drKafkaAdminGroups = config.getDrKafkaAdminGroups();
if (drKafkaAdminGroups != null) {
allowedAdminGroups.addAll(drKafkaAdminGroups);
LOG.info("Following groups will be allowed admin access:" + allowedAdminGroups);
}
}

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
String userHeader = requestContext.getHeaderString(USER_HEADER);
String groupsHeader = requestContext.getHeaderString(GROUPS_HEADER);
DrKafkaSecurityContext ctx = null;
if (userHeader != null && groupsHeader != null) {
Set<String> userGroups = new HashSet<>(Arrays.asList(groupsHeader.split(",")));
SetView<String> intersection = Sets.intersection(allowedAdminGroups, userGroups);
if (intersection.size() > 0) {
ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), ADMIN_ROLE_SET);
requestContext.setSecurityContext(ctx);
LOG.info("Received authenticated request, created context:" + ctx);
return;
}
}

ctx = new DrKafkaSecurityContext(new UserPrincipal(userHeader), EMPTY_ROLE_SET);
requestContext.setSecurityContext(ctx);
LOG.info("Received annonymous request, bypassing authorizer");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.pinterest.doctorkafka.security;

import java.security.Principal;

public class UserPrincipal implements Principal {

private String username;

public UserPrincipal(String username) {
this.username = username;
}

@Override
public String getName() {
return username;
}

@Override
public String toString() {
return "UserPrincipal [username=" + username + "]";
}

}

0 comments on commit e8af094

Please sign in to comment.