Confluent MCP Server
Official Confluent MCP server for Kafka, Flink, Schema Registry, Connectors, Tableflow, and billing management
Score Breakdown
Server Info
- Package
- @confluentinc/mcp-confluent
- Registry
- npm
- Repository
- confluentinc/mcp-confluent
- Maintainer
- Community
- Category
- Analytics & Data
- Tags
- streamingmessagingevents
- Last Scanned
- 7 Apr 2026
Findings
7 issuesAuthentication & Identity
MEDIUMHTTP/SSE transport supports per-request credentials
Supports stdio, SSE, and Streamable HTTP transports via Fastify. HTTP/SSE transports support API key auth (MCP_API_KEY, min 32 chars) with DNS rebinding protection (MCP_ALLOWED_HOSTS). Auth can be disabled for dev via MCP_AUTH_DISABLED. No MCP OAuth. Tools are auto-enabled/disabled based on which API key env vars are present. Each handler declares required env vars via getRequiredEnvVars().
Implement the MCP OAuth spec so users authenticate directly without platform mediation.
Tool Schema Quality
MEDIUMOnly 13 of 50 schemas have parameter constraints
Most schemas lack maxLength, enum, or pattern constraints on string parameters.
Add constraints to string parameters, especially on write operations.
CRITICALDangerous execution surface: create-flink-statement accepts arbitrary Flink SQL statements (max 131072 chars) which can create/alter/drop tables and run arbitrary queries
Tool allows raw code/query execution which could be exploited via prompt injection.
Use parameterized queries or validated command sets.
LLM Safety
MEDIUM3 tool descriptions are too vague
Short or generic descriptions make tool selection unreliable.
Expand descriptions with specific actions, data types, and side effects.
HIGHTool descriptions contain instructional language
Descriptions include directives that could influence LLM behavior beyond tool selection.
Remove instructional language. Descriptions should be purely factual.
Data Exposure
MEDIUM5 list operations lack pagination
Flink statement listing has good pagination (pageSize max 100, pageToken). Environments and billing have pagination. Tableflow lists mention pagination in descriptions. However, list-topics returns all topics with no pagination. consume-messages has configurable maxMessages (default 10) and timeoutMs (default 10000ms) which limits data exposure. No field selection on any tool.
Add limit/offset or cursor-based pagination.
LOWNo field selection on responses
Responses return full records rather than projected fields.
Implement field selection to return only relevant fields.
Tools
50 total| Name | Description | Risk |
|---|---|---|
| list-topics | List all topics in the Kafka cluster. | read |
| create-topics | Create one or more Kafka topics. | write |
| delete-topics | Delete the topic with the given names. | admin |
| produce-message | Produce records to a Kafka topic. Supports Confluent Schema Registry serialization (AVRO, JSON, PROTOBUF) for both key and value. | write |
| consume-messages | Consumes messages from one or more Kafka topics. Supports automatic deserialization of Schema Registry encoded messages (AVRO, JSON, PROTOBUF). | read |
| alter-topic-config | Alter topic configuration in Confluent Cloud. | write |
| get-topic-config | Retrieve configuration details for a specific Kafka topic. | read |
| list-flink-statements | Retrieve a sorted, filtered, paginated list of all statements. | read |
| create-flink-statement | Make a request to create a statement. | write |
| read-flink-statement | Make a request to read a statement and its results | read |
| delete-flink-statements | Make a request to delete a statement. | admin |
| get-flink-statement-exceptions | Retrieve the 10 most recent exceptions for a Flink SQL statement. Useful for diagnosing failed or failing statements. | read |
| list-flink-catalogs | List all catalogs available in the Flink environment via INFORMATION_SCHEMA.CATALOGS. | read |
| list-flink-databases | List all databases (schemas) in a Flink catalog via INFORMATION_SCHEMA.SCHEMATA. Returns catalog and database names. | read |
| list-flink-tables | List all tables in a Flink database via INFORMATION_SCHEMA.TABLES. Returns table names and types. | read |
| describe-flink-table | Get full schema details for a Flink table via INFORMATION_SCHEMA.COLUMNS. Returns column names, data types (including $rowtime), nullability, and metadata column info. | read |
| get-flink-table-info | Get table metadata via INFORMATION_SCHEMA.TABLES. Returns watermark configuration, distribution info, and table type. | read |
| check-flink-statement-health | Perform an aggregate health check for a Flink SQL statement. Returns status (healthy/warning/critical), current phase, recent exceptions, and diagnostic details. | read |
| detect-flink-statement-issues | Detect issues for a Flink SQL statement by analyzing status, exceptions, and performance metrics. Identifies problems like failures, backpressure, consumer lag, late data, memory issues, and provides suggested fixes. | read |
| get-flink-statement-profile | Get Query Profiler data for a Flink SQL statement. Returns the task graph with human-readable task/operator names, per-task metrics (records in/out, state size, busyness, idleness, backpressure, watermarks), and automated issue detection (backpressure bottlenecks, consumer lag, late data, large state). | read |
| list-connectors | Retrieve a list of "names" of the active connectors. You can then make a read request for a specific connector by name. | read |
| read-connector | Get information about the connector. | read |
| create-connector | Create a new connector. Returns the new connector information if successful. | write |
| delete-connector | Delete an existing connector. Returns success message if deletion was successful. | admin |
| search-topics-by-tag | List all topics in the Kafka cluster with the specified tag. | read |
| search-topics-by-name | List all topics in the Kafka cluster matching the specified name. | read |
| create-topic-tags | Create new tag definitions in Confluent Cloud. | write |
| delete-tag | Delete a tag definition from Confluent Cloud. | admin |
| remove-tag-from-entity | Remove tag from an entity in Confluent Cloud. | write |
| add-tags-to-topic | Assign existing tags to Kafka topics in Confluent Cloud. | write |
| list-tags | Retrieve all tags with definitions from Confluent Cloud Schema Registry. | read |
| list-clusters | Get all clusters in the Confluent Cloud environment | read |
| list-environments | Get all environments in Confluent Cloud with pagination support | read |
| read-environment | Get details of a specific environment by ID | read |
| list-schemas | List all schemas in the Schema Registry. | read |
| delete-schema | Delete a schema subject or a specific version from the Schema Registry. If version is omitted, all versions of the subject are deleted. | admin |
| create-tableflow-topic | Make a request to create a tableflow topic. | write |
| list-tableflow-regions | Retrieve a sorted, filtered, paginated list of all tableflow regions. | read |
| list-tableflow-topics | Retrieve a sorted, filtered, paginated list of all tableflow topics. | read |
| read-tableflow-topic | Make a request to read a tableflow topic. | read |
| update-tableflow-topic | Make a request to update a tableflow topic. | write |
| delete-tableflow-topic | Make a request to delete a tableflow topic. | admin |
| create-tableflow-catalog-integration | Make a request to create a catalog integration. | write |
| list-tableflow-catalog-integrations | Retrieve a sorted, filtered, paginated list of all catalog integrations. | read |
| read-tableflow-catalog-integration | Make a request to read a catalog integration. | read |
| update-tableflow-catalog-integration | Make a request to update a catalog integration. | write |
| delete-tableflow-catalog-integration | Make a request to delete a tableflow catalog integration. | admin |
| list-billing-costs | Retrieve billing cost data for a Confluent Cloud organization within a specified date range with pagination support | read |
| query-metrics | Query Confluent Cloud metrics from the Telemetry API. IMPORTANT: Use the list-available-metrics tool first to discover valid metric names and filter fields. Supports Kafka, Flink, Connectors, and Schema Registry metrics with flexible filtering, aggregation, and grouping. | read |
| list-available-metrics | List available Confluent Cloud metrics and their filter fields from the Telemetry API. Use this tool BEFORE query-metrics to discover valid metric names, resource filter fields, and grouping labels. | read |
Deploy Confluent MCP Server securely
CompleteFlow adds per-user authentication, permission scoping, and audit logging to any MCP server out of the box.
Deploy on CompleteFlow