File: //proc/thread-self/root/lib/fm-agent/plugins/apache_zookeeper.py
import agent_util
import sys
import socket
import traceback
def netcat(hostname, port, content):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((hostname, port))
s.sendall(content)
s.shutdown(socket.SHUT_WR)
while 1:
data = s.recv(1024)
if data == "":
break
response = data
break
s.close()
return response
def nc_to_dict(data):
parsed = {}
a = data.split("\n")
for l in a:
try:
b = l.split("\t")
parsed[b[0]] = b[1]
except:
continue
return parsed
class ApacheZookeeperPlugin(agent_util.Plugin):
textkey = "apache_zookeeper"
label = "Apache Zookeeper"
@classmethod
def get_metadata(self, config):
status = agent_util.SUPPORTED
msg = None
if status == agent_util.SUPPORTED and (
not "host" in config or not "port" in config
):
msg = "The host and port entries were not found in the [apache_zookeeper] block in the agent config file."
self.log.info(msg)
status = agent_util.MISCONFIGURED
return {}
if status == agent_util.SUPPORTED:
response = None
try:
response = netcat(config["host"], int(config["port"]), "envi")
except:
self.log.exception("Error running Zookeeper hello")
self.log.debug(traceback.format_exc())
status = agent_util.MISCONFIGURED
msg = ""
return {}
if response is None or response == "":
self.log.exception("Bad response running Zookeeper hello")
return {}
data = {
# basic
"avg_latency": {
"label": "Average request latency",
"options": None,
"status": status,
"error_message": msg,
"unit": "ms",
},
"max_latency": {
"label": "Maximum request latency",
"options": None,
"status": status,
"error_message": msg,
"unit": "ms",
},
"packets_received": {
"label": "Packets received",
"options": None,
"status": status,
"error_message": msg,
"unit": "packets",
},
"packets_sent": {
"label": "Packets sent",
"options": None,
"status": status,
"error_message": msg,
"unit": "packets",
},
"packets_received_per_sec": {
"label": "Packets received/sec",
"options": None,
"status": status,
"error_message": msg,
"unit": "packets/sec",
},
"packets_sent_per_sec": {
"label": "Packets sent/sec",
"options": None,
"status": status,
"error_message": msg,
"unit": "packets/sec",
},
"outstanding_requests": {
"label": "Outstanding Requests",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
"server_state": {
"label": "Server Mode",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
"znode_count": {
"label": "Node count",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
"watch_count": {
"label": "Watcher Count",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
"approximate_data_size": {
"label": "Approximate data size",
"options": None,
"status": status,
"error_message": msg,
"unit": "bytes",
},
"open_file_descriptor_count": {
"label": "Open file descriptors",
"options": None,
"status": status,
"error_message": msg,
"unit": "files",
},
"fsync_threshold_exceed_count": {
"label": "Slow fsync count",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
"ruok": {
"label": "Node error state",
"options": None,
"status": status,
"error_message": msg,
"unit": "",
},
}
return data
def check(self, textkey, data, config):
if textkey == "ruok":
output = netcat(config["host"], int(config["port"]), "ruok")
if output == "imok":
return 0
else:
return 1
else:
output = netcat(config["host"], int(config["port"]), "mntr")
data = nc_to_dict(output)
key = "zk_" + str(textkey.replace("_per_sec", ""))
value = data.get(key, False)
if textkey == "server_state":
if value == "follower":
return 0
if value == "leader":
return 1
if (
textkey == "packets_received_per_sec"
or textkey == "packets_sent_per_sec"
):
self.log.debug(data)
if value > 0:
cached = self.get_cache_results(textkey, None)
self.cache_result(textkey, None, value, replace=True)
self.log.debug("####\nGot cached result!\n%s" % cached)
print("####\nGot cached result!\n%s" % cached)
delta, c = cached[0]
rate = (float(value) / float(c)) / float(delta)
return rate
else:
return 0
if value:
return float(value)
else:
return None