Tags: kafka spring reactive oauth2
When integrating Kafka consumers with reactive OAuth2 for securing outbound HTTP calls, a common pitfall arises: Kafka processing typically operates in a background, non-HTTP context. This means there's no ServerWebExchange available, which is a core component of the reactive web stack that exposes the current HTTP request/response and request-scoped attributes.
ServerWebExchange is Absent in Kafka ConsumersServerBearerExchangeFilterFunction Limitations: While ServerBearerExchangeFilterFunction might seem like a solution, it relies on the presence of a ServerWebExchange to extract a bearer token. It's designed for scenarios where an HTTP request is actively being processed. If your Kafka processing happens to be part of an existing HTTP request flow (which is rare), it might appear to work, but this is not its intended use for background tasks.Server-side helpers like ServerOAuth2AuthorizedClientExchangeFilterFunction or any component that depends on ServerWebExchange will inevitably fail in background contexts with errors such as "ServerWebExchange can't be null". This is because these components are built to operate within the lifecycle of an HTTP request.
For Kafka consumers and other non-HTTP contexts, the client-credentials grant flow must be handled programmatically. This involves configuring a reactive OAuth2 client stack that can obtain and manage tokens independently of an HTTP request.
Key Components for a Reactive OAuth2 Client Stack:
ReactiveClientRegistrationRepository: Manages client registrations, providing details about OAuth2 clients (e.g., client ID, client secret, authorization grant type).InMemoryReactiveOAuth2AuthorizedClientService: Stores and retrieves OAuth2AuthorizedClient instances in memory. For production, consider a persistent store.AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager: This manager is responsible for authorizing OAuth2 clients. It uses the ReactiveClientRegistrationRepository and InMemoryReactiveOAuth2AuthorizedClientService to obtain and refresh tokens. Crucially, it needs to be configured with a clientCredentials() provider for non-HTTP contexts.Minimal Sketch (Reactive, Conceptual):
The following code snippet illustrates how to set up and use these components to obtain an OAuth2 token and attach it to a WebClient request:
import org.springframework.security.oauth2.client.AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager;
import org.springframework.security.oauth2.client.InMemoryReactiveOAuth2AuthorizedClientService;
import org.springframework.security.oauth2.client.OAuth2AuthorizeRequest;
import org.springframework.security.oauth2.client.registration.ReactiveClientRegistrationRepository;
import org.springframework.security.oauth2.client.web.reactive.function.client.ReactiveOAuth2AuthorizedClientProviderBuilder;
import org.springframework.web.reactive.function.client.WebClient;
// Assume 'regs' is an instance of ReactiveClientRegistrationRepository,
// configured with your client registration details (e.g., "service-client").
ReactiveClientRegistrationRepository regs = ...;
// Initialize the client service to store authorized clients.
InMemoryReactiveOAuth2AuthorizedClientService clientService =
new InMemoryReactiveOAuth2AuthorizedClientService(regs);
// Configure the authorized client manager for client_credentials grant.
AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager manager =
new AuthorizedClientServiceReactiveOAuth2AuthorizedClientManager(regs, clientService);
manager.setAuthorizedClientProvider(
ReactiveOAuth2AuthorizedClientProviderBuilder.builder().clientCredentials().build());
// Create an authorization request for your service client.
OAuth2AuthorizeRequest authReq = OAuth2AuthorizeRequest.withClientRegistrationId("service-client")
.principal("kafka-worker") // A principal name for the authorization request
.build();
// Use the manager to obtain an OAuth2AuthorizedClient and then make a WebClient call.
manager.authorize(authReq)
.flatMap(authorizedClient -> {
// Build your WebClient (e.g., using WebClient.builder().build())
WebClient webClient = WebClient.builder().build();
return webClient.get()
.uri("https://api.example.com")
.headers(headers -> headers.setBearerAuth(authorizedClient.getAccessToken().getTokenValue()))
.retrieve()
.bodyToMono(String.class);
})
.subscribe(
response -> System.out.println("API Response: " + response),
error -> System.err.println("Error calling API: " + error.getMessage())
);