File: //lib/fm-agent/plugins/apache_kafka.py
import agent_util
import logging
import traceback
logger = logging.getLogger(__name__)
### Mapping of JMX URI entries to their agent readable counterparts
JMX_MAPPING = {
"broker.bips.oneminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"OneMinuteRate",
None,
),
"broker.bips.fiveminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"FiveMinuteRate",
None,
),
"broker.bips.fifteenminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"FifteenMinuteRate",
None,
),
"broker.bips.meanrate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec",
"MeanRate",
None,
),
"broker.bops.oneminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"OneMinuteRate",
None,
),
"broker.bops.fiveminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"FiveMinuteRate",
None,
),
"broker.bops.fifteenminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"FifteenMinuteRate",
None,
),
"broker.bops.meanrate": (
"kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec",
"MeanRate",
None,
),
"broker.mips.oneminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"OneMinuteRate",
None,
),
"broker.mips.fiveminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"FiveMinuteRate",
None,
),
"broker.mips.fifteenminuterate": (
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"FifteenMinuteRate",
None,
),
"broker.mips.meanrate": (
"kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec",
"MeanRate",
None,
),
"underreplicatedpartitions": (
"kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions",
"Value",
None,
),
"fetch.queue-size": ("kafka.server:type=Fetch", "queue-size", None),
"memory.heap.committed": ("java.lang:type=Memory", "HeapMemoryUsage", "committed"),
"memory.heap.used": ("java.lang:type=Memory", "HeapMemoryUsage", "used"),
"memory.heap.max": ("java.lang:type=Memory", "HeapMemoryUsage", "max"),
}
####
def discover_beans(connection):
allowed_beans = [
"kafka.server:type=BrokerTopicMetrics",
"kafka.server:type=ReplicaManager",
"kafka.log:type=LogFlushStats",
"java.lang:type=Memory",
"kafka.server:type=Fetch",
]
ignored_topics = ["__consumer_offsets", "ReplicaFetcherThread-0-2"]
discovered_beans = []
discovered_topics = []
avail_beans = connection.queryMBeans(None, None)
for bean in avail_beans:
name = bean.objectName.toString()
if any(b in name for b in allowed_beans):
discovered_beans.append(bean.toString())
if "topic" in name:
topic = name.split(",")[2].split("=")[1]
if topic not in ignored_topics:
discovered_topics.append(topic)
return discovered_beans, list(set(discovered_topics))
class ApacheKafkaPlugin(agent_util.Plugin):
textkey = "apache_kafka_jmx"
label = "Apache Kafka (JMX)"
connection = None
@classmethod
def get_metadata(self, config):
status = agent_util.SUPPORTED
msg = None
# Check for jmx configuration block
if not config:
self.log.info("No JMX configuration found")
return {}
# make sure jpype1 is installed first
try:
import jpype
from jpype import java, javax
except:
msg = "Unable to access JMX metrics due to missing jpype library."
self.log.info(msg)
status = agent_util.MISCONFIGURED
# Check for config setting sin jmx configuration block
for key in ["port", "host"]:
if key not in config:
msg = (
"Missing value for %s in the [jmx] block of the agent config file."
% key
)
self.log.info(msg)
status = agent_util.MISCONFIGURED
# we'll need to get the default JVM path if not specified. If that doesn't work, throw an error
if "jvm_path" not in config:
try:
jvm_path = jpype.getDefaultJVMPath()
if not jvm_path:
msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
self.log.info(msg)
status = agent_util.MISCONFIGURED
except:
msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
self.log.info(msg)
self.log.error(traceback.format_exc())
status = agent_util.MISCONFIGURED
elif "jvm_path" in config:
jvm_path = config["jvm_path"]
try:
if status == agent_util.SUPPORTED and not jpype.isJVMStarted():
jpype.startJVM(jvm_path)
except:
msg = "Unable to access JMX metrics because JVM cannot be started."
self.log.info(msg)
status = agent_util.MISCONFIGURED
if status == agent_util.SUPPORTED:
try:
if not jpype.isJVMStarted():
jpype.startJVM(config["jvm_path"])
jhash = java.util.HashMap()
if config.get("username") and config.get("password"):
jarray = jpype.JArray(java.lang.String)(
[config["username"], config["password"]]
)
jhash.put(javax.management.remote.JMXConnector.CREDENTIALS, jarray)
url = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi" % (
config["host"],
int(config["port"]),
)
jmxurl = javax.management.remote.JMXServiceURL(url)
# Gather the topics we can monitor
jmxsoc = javax.management.remote.JMXConnectorFactory.connect(
jmxurl, jhash
)
self.connection = jmxsoc.getMBeanServerConnection()
except:
msg = (
"Unable to access JMX metrics, JMX is not running or not installed."
)
self.log.info(msg)
status = agent_util.MISCONFIGURED
return {}
beans, topics = discover_beans(self.connection)
metadata = {
"broker.bips.oneminuterate": {
"label": "Topic Byes In/sec - 1 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bips.fiveminuterate": {
"label": "Topic Byes In/sec - 5 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bips.fifteenminuterate": {
"label": "Topic Byes In/sec - 15 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bips.meanrate": {
"label": "Topic Byes In/sec - Avg",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bops.oneminuterate": {
"label": "Topic Byes Out/sec - 1 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bops.fiveminuterate": {
"label": "Topic Byes Out/sec - 5 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bops.fifteenminuterate": {
"label": "Topic Byes Out/sec - 15 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.bops.meanrate": {
"label": "Topic Byes Out/sec - Avg",
"options": topics,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"broker.mips.oneminuterate": {
"label": "Topic Messages In/sec - 1 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "messages",
},
"broker.mips.fiveminuterate": {
"label": "Topic Messages In/sec - 5 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "messages",
},
"broker.mips.fifteenminuterate": {
"label": "Topic Messages In/sec - 15 min",
"options": topics,
"status": status,
"error_message": msg,
"unit": "messages",
},
"broker.mips.meanrate": {
"label": "Topic Messages In/sec - Avg",
"options": topics,
"status": status,
"error_message": msg,
"unit": "messages",
},
"underreplicatedpartitions": {
"label": "Replica Manager Unreplicated Partitions",
"options": None,
"status": status,
"error_message": msg,
"unit": "partitions",
},
"fetch.queue-size": {
"label": "Queued messages",
"options": None,
"status": status,
"error_message": msg,
},
"memory.heap.committed": {
"label": "Heap Memory - Committed",
"options": None,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"memory.heap.used": {
"label": "Heap Memory - Used",
"options": None,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"memory.heap.max": {
"label": "Heap Memory - Max",
"options": None,
"status": status,
"error_message": msg,
"unit": "bytes",
},
}
return metadata
def check(self, textkey, data, config):
try:
import jpype
from jpype import java, javax
except:
self.log.error("Unable to import jpype! Is it installed?")
return None
try:
# we'll need to get the default JVM path if not specified. If that doesn't work, throw an error
if "jvm_path" not in config:
try:
jvm_path = jpype.getDefaultJVMPath()
if not jvm_path:
msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
self.log.info(msg)
except:
msg = "Unable to find JVM, please specify 'jvm_path' in the [jmx] block of the agent config file."
self.log.info(msg)
self.log.error(traceback.format_exc())
elif "jvm_path" in config:
jvm_path = config["jvm_path"]
if not jpype.isJVMStarted():
jpype.startJVM(jvm_path)
jhash = java.util.HashMap()
if config.get("username") and config.get("password"):
jarray = jpype.JArray(java.lang.String)(
[config["username"], config["password"]]
)
jhash.put(javax.management.remote.JMXConnector.CREDENTIALS, jarray)
url = "service:jmx:rmi:///jndi/rmi://%s:%d/jmxrmi" % (
config["host"],
int(config["port"]),
)
jmxurl = javax.management.remote.JMXServiceURL(url)
jmxsoc = javax.management.remote.JMXConnectorFactory.connect(jmxurl, jhash)
connection = jmxsoc.getMBeanServerConnection()
parts = JMX_MAPPING.get(textkey, None)
if parts is None or not parts:
self.log.error(
"Unable to find Kafka metric %s in known metrics!" % textkey
)
return None
# start building the JMX object
obj = parts[0]
if data:
obj += ",topic=%s" % data
# get the actual metric
attribute = parts[1]
# if the metric is buried deeper in a dict, grab it
val = parts[2]
res = connection.getAttribute(javax.management.ObjectName(obj), attribute)
log_msg = "Checking Kafka metric %s" % attribute
if val is not None:
return res.contents.get(val).floatValue()
else:
log_msg += " with key %s" % val
return res.floatValue()
self.log.debug(log_msg)
except:
self.log.critical(
"Error checking Kafka metric %s - %s \n%s"
% (textkey, data, traceback.format_exc())
)
return None