nut/scripts/python/module/PyNUT.py.in
2022-06-29 12:37:36 +02:00

351 lines
13 KiB
Python

#!@PYTHON@
# -*- coding: utf-8 -*-
# Copyright (C) 2008 David Goncalves <david@lestat.st>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# 2008-01-14 David Goncalves
# PyNUT is an abstraction class to access NUT (Network UPS Tools) server.
#
# 2008-06-09 David Goncalves
# Added 'GetRWVars' and 'SetRWVar' commands.
#
# 2009-02-19 David Goncalves
# Changed class PyNUT to PyNUTClient
#
# 2010-07-23 David Goncalves - Version 1.2
# Changed GetRWVars function that fails is the UPS is not
# providing such vars.
#
# 2011-07-05 René Martín Rodríguez <rmrodri@ull.es> - Version 1.2.1
# Added support for FSD, HELP and VER commands
#
# 2012-02-07 René Martín Rodríguez <rmrodri@ull.es> - Version 1.2.2
# Added support for LIST CLIENTS command
#
# 2014-06-03 george2 - Version 1.3.0
# Added custom exception class, fixed minor bug, added Python 3 support.
#
# 2021-09-27 Jim Klimov <jimklimov+nut@gmail.com> - Version 1.4.0
# Revise strings used to be byte sequences as required by telnetlib
# in Python 3.9, by spelling out b"STR" or str.encode('ascii');
# the change was also tested to work with Python 2.7, 3.4, 3.5 and
# 3.7 (to the extent of accompanying test_nutclient.py at least).
import telnetlib
class PyNUTError( Exception ) :
""" Base class for custom exceptions """
class PyNUTClient :
""" Abstraction class to access NUT (Network UPS Tools) server """
__debug = None # Set class to debug mode (prints everything useful for debuging...)
__host = None
__port = None
__login = None
__password = None
__timeout = None
__srv_handler = None
__version = "1.4.0"
__release = "2021-09-27"
def __init__( self, host="127.0.0.1", port=3493, login=None, password=None, debug=False, timeout=5 ) :
""" Class initialization method
host : Host to connect (default to localhost)
port : Port where NUT listens for connections (default to 3493)
login : Login used to connect to NUT server (default to None for no authentication)
password : Password used when using authentication (default to None)
debug : Boolean, put class in debug mode (prints everything on console, default to False)
timeout : Timeout used to wait for network response
"""
self.__debug = debug
if self.__debug :
print( "[DEBUG] Class initialization..." )
print( "[DEBUG] -> Host = %s (port %s)" % ( host, port ) )
print( "[DEBUG] -> Login = '%s' / '%s'" % ( login, password ) )
self.__host = host
self.__port = port
self.__login = login
self.__password = password
self.__timeout = 5
self.__connect()
# Try to disconnect cleanly when class is deleted ;)
def __del__( self ) :
""" Class destructor method """
try :
self.__srv_handler.write( b"LOGOUT\n" )
except :
pass
def __connect( self ) :
""" Connects to the defined server
If login/pass was specified, the class tries to authenticate. An error is raised
if something goes wrong.
"""
if self.__debug :
print( "[DEBUG] Connecting to host" )
self.__srv_handler = telnetlib.Telnet( self.__host, self.__port )
if self.__login != None :
self.__srv_handler.write( ("USERNAME %s\n" % self.__login).encode('ascii') )
result = self.__srv_handler.read_until( b"\n", self.__timeout )
if result[:2] != b"OK" :
raise PyNUTError( result.replace( b"\n", b"" ) )
if self.__password != None :
self.__srv_handler.write( ("PASSWORD %s\n" % self.__password).encode('ascii') )
result = self.__srv_handler.read_until( b"\n", self.__timeout )
if result[:2] != b"OK" :
raise PyNUTError( result.replace( b"\n", b"" ) )
def GetUPSList( self ) :
""" Returns the list of available UPS from the NUT server
The result is a dictionary containing 'key->val' pairs of 'UPSName' and 'UPS Description'
"""
if self.__debug :
print( "[DEBUG] GetUPSList from server" )
self.__srv_handler.write( b"LIST UPS\n" )
result = self.__srv_handler.read_until( b"\n" )
if result != b"BEGIN LIST UPS\n" :
raise PyNUTError( result.replace( b"\n", b"" ) )
result = self.__srv_handler.read_until( b"END LIST UPS\n" )
ups_list = {}
for line in result.split( b"\n" ) :
if line[:3] == b"UPS" :
ups, desc = line[4:-1].split( b'"' )
ups_list[ ups.replace( b" ", b"" ) ] = desc
return( ups_list )
def GetUPSVars( self, ups="" ) :
""" Get all available vars from the specified UPS
The result is a dictionary containing 'key->val' pairs of all
available vars.
"""
if self.__debug :
print( "[DEBUG] GetUPSVars called..." )
self.__srv_handler.write( ("LIST VAR %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if result != ("BEGIN LIST VAR %s\n" % ups).encode('ascii') :
raise PyNUTError( result.replace( b"\n", b"" ) )
ups_vars = {}
result = self.__srv_handler.read_until( ("END LIST VAR %s\n" % ups).encode('ascii') )
offset = len( ("VAR %s " % ups ).encode('ascii') )
end_offset = 0 - ( len( ("END LIST VAR %s\n" % ups).encode('ascii') ) + 1 )
for current in result[:end_offset].split( b"\n" ) :
var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" )
data = current[ offset: ].split( b'"' )[1]
ups_vars[ var ] = data
return( ups_vars )
def GetUPSCommands( self, ups="" ) :
""" Get all available commands for the specified UPS
The result is a dict object with command name as key and a description
of the command as value
"""
if self.__debug :
print( "[DEBUG] GetUPSCommands called..." )
self.__srv_handler.write( ("LIST CMD %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if result != ("BEGIN LIST CMD %s\n" % ups).encode('ascii') :
raise PyNUTError( result.replace( b"\n", b"" ) )
ups_cmds = {}
result = self.__srv_handler.read_until( ("END LIST CMD %s\n" % ups).encode('ascii') )
offset = len( ("CMD %s " % ups).encode('ascii') )
end_offset = 0 - ( len( ("END LIST CMD %s\n" % ups).encode('ascii') ) + 1 )
for current in result[:end_offset].split( b"\n" ) :
var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" )
# For each var we try to get the available description
try :
self.__srv_handler.write( ("GET CMDDESC %s %s\n" % ( ups, var )).encode('ascii') )
temp = self.__srv_handler.read_until( b"\n" )
if temp[:7] != b"CMDDESC" :
raise PyNUTError
else :
off = len( ("CMDDESC %s %s " % ( ups, var )).encode('ascii') )
desc = temp[off:-1].split(b'"')[1]
except :
desc = var
ups_cmds[ var ] = desc
return( ups_cmds )
def GetRWVars( self, ups="" ) :
""" Get a list of all writable vars from the selected UPS
The result is presented as a dictionary containing 'key->val' pairs
"""
if self.__debug :
print( "[DEBUG] GetUPSVars from '%s'..." % ups )
self.__srv_handler.write( ("LIST RW %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result != ("BEGIN LIST RW %s\n" % ups).encode('ascii') ) :
raise PyNUTError( result.replace( b"\n", b"" ) )
result = self.__srv_handler.read_until( ("END LIST RW %s\n" % ups).encode('ascii') )
offset = len( ("VAR %s" % ups).encode('ascii') )
end_offset = 0 - ( len( ("END LIST RW %s\n" % ups).encode('ascii') ) + 1 )
rw_vars = {}
try :
for current in result[:end_offset].split( b"\n" ) :
var = current[ offset: ].split( b'"' )[0].replace( b" ", b"" )
data = current[ offset: ].split( b'"' )[1]
rw_vars[ var ] = data
except :
pass
return( rw_vars )
def SetRWVar( self, ups="", var="", value="" ):
""" Set a variable to the specified value on selected UPS
The variable must be a writable value (cf GetRWVars) and you must have the proper
rights to set it (maybe login/password).
"""
self.__srv_handler.write( ("SET VAR %s %s %s\n" % ( ups, var, value )).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result == b"OK\n" ) :
return( "OK" )
else :
raise PyNUTError( result )
def RunUPSCommand( self, ups="", command="" ) :
""" Send a command to the specified UPS
Returns OK on success or raises an error
"""
if self.__debug :
print( "[DEBUG] RunUPSCommand called..." )
self.__srv_handler.write( ("INSTCMD %s %s\n" % ( ups, command )).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result == b"OK\n" ) :
return( "OK" )
else :
raise PyNUTError( result.replace( b"\n", b"" ) )
def FSD( self, ups="") :
""" Send FSD command
Returns OK on success or raises an error
NOTE: API changed since NUT 2.8.0 to replace MASTER with PRIMARY
(and backwards-compatible alias handling)
"""
if self.__debug :
print( "[DEBUG] PRIMARY called..." )
self.__srv_handler.write( ("PRIMARY %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result != b"OK PRIMARY-GRANTED\n" ) :
if self.__debug :
print( "[DEBUG] Retrying: MASTER called..." )
self.__srv_handler.write( ("MASTER %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result != b"OK MASTER-GRANTED\n" ) :
raise PyNUTError( ( "Primary level functions are not available", "" ) )
if self.__debug :
print( "[DEBUG] FSD called..." )
self.__srv_handler.write( ("FSD %s\n" % ups).encode('ascii') )
result = self.__srv_handler.read_until( b"\n" )
if ( result == b"OK FSD-SET\n" ) :
return( "OK" )
else :
raise PyNUTError( result.replace( b"\n", b"" ) )
def help(self) :
""" Send HELP command
"""
if self.__debug :
print( "[DEBUG] HELP called..." )
self.__srv_handler.write( b"HELP\n" )
return self.__srv_handler.read_until( b"\n" )
def ver(self) :
""" Send VER command
"""
if self.__debug :
print( "[DEBUG] VER called..." )
self.__srv_handler.write( b"VER\n" )
return self.__srv_handler.read_until( b"\n" )
def ListClients( self, ups = None ) :
""" Returns the list of connected clients from the NUT server
The result is a dictionary containing 'key->val' pairs of 'UPSName' and a list of clients
"""
if self.__debug :
print( "[DEBUG] ListClients from server" )
if ups and (ups not in self.GetUPSList()):
raise PyNUTError( "%s is not a valid UPS" % ups )
if ups:
self.__srv_handler.write( ("LIST CLIENTS %s\n" % ups).encode('ascii') )
else:
self.__srv_handler.write( b"LIST CLIENTS\n" )
result = self.__srv_handler.read_until( b"\n" )
if result != b"BEGIN LIST CLIENTS\n" :
raise PyNUTError( result.replace( b"\n", b"" ) )
result = self.__srv_handler.read_until( b"END LIST CLIENTS\n" )
ups_list = {}
for line in result.split( b"\n" ):
if line[:6] == b"CLIENT" :
host, ups = line[7:].split(b' ')
ups.replace(b' ', b'')
if not ups in ups_list:
ups_list[ups] = []
ups_list[ups].append(host)
return( ups_list )