使用Python搭建自动化运维平台并编写运维脚本

本篇文章给大家谈谈python 运维开发都需要掌握哪些,以及python搭建自动化运维平台,希望对各位有所帮助,不要忘了收藏本站喔。

这两天在学校实训:用Python Django做学校的在线课堂系统,可谓是让我这个从未摸过Django和SQL的屌丝涨了市面快码知识。

工作之余,我无意之间翻了翻都写哪些经典的Python WEB框架:Flask、Django这些都太大了,自己手动写纯Socket工程量又太大……有没有什么好一点的这种方案呢?既可以实现轻量化,又可以不用去研究复杂的H5、JS和SQL```*``

最后还真的被我给找到了,这就是我们今天的主角:基于Python的MINI型APP框架:Streamlit高中信息技术python怎么学。

Streamlit原本的设计初衷是为了方便实现在WEB上搭建基于Python数据可视化或深度学习的APP以便于中小型团队能够快速的构建APP模型并针对APP中的BUG能够直观的查找并修正。

但是我就比较特立独行,你们既然都干这个,那我就干点花花的:用Streamlit搭建一个小型的运维监控平台。

其实我原来也曾试想着在Django的项目中嵌入一个Models去存储这些指标数据的,可是这么做的显然很不划算(只是我单纯的不想写模型罢了),于是乎借助Streamlit强大的API功能,我就开始了这个项目的构建。

环境准备

首先我们需要准备运行这个小APP的环境:

        -Anaconda3(别问我为什么不用原生Python,问就是懒得下包)

        -Streamlit:我们APP的主框架和逻辑支撑都靠它了

        -Pandas:用于存放短时数据流和部分数据表,之后会讲到

        -Psutil和Platform库:可以说没有这俩库这个APP写不成

        -Plotly:用于实现可视化效果(其实我只用了一个)

        -Pycuda:用于实现检测实例下的CUDA GPU数量极其部分属性

附上懒狗安包指令:

pip install streamlit psutil pandas pycuda plotly

准备好环境了,我们是时候开始写点东西了

主APP编写

在编写APP之前,我们需要认真的考虑下APP中UI的排版与布局,因为虽然Streamlit的API非常强大,但它提供的可选UI界面真的少的可怜,最后在我激烈的思想斗争之下,我选择了下拉菜单这种经典UI布局。

废话不多说,我上来就先构建了一个下拉菜单的实例:

#在这里使用set_page_config()方法定义APP在WEB浏览器上现实的标题和布局
st.set_page_config(page_title="Django实例--{}".format(platform.node()),
layout="wide")
#创建一个selectbox下拉菜单的实例,参数分别为盒子的名称以及其他监视对象的子菜单名称
sidebar = st.selectbox(
    "实例资源监视",
    ("基本信息","网络配置","基础监控","存储监控","弹性用户组配置","弹性安全组","GPU管理","命令终端")
)

好的,既然实例创建完成了,如何这个下拉菜单的实例动起来,或者随着我们点击不同业务单元,它就可以自动变换显示的内容,这是我们想要的结果。那么,这就需要对实例点击内容的判断了。

子APP编写

前面有提到过selectbox()方法下存放的两个参数:一个是盒子的名称,另外一个是由其他对象构成的数组,那么我们的逻辑一定是要从子对象的内容进行入手。

Streamlit的API在这一块就做的非常的贴心,你住需要一行简单到不能再简单的逻辑判断,即可实现对APP显示内容的变换。

if sidebar == "基本信息":

就这么简单!

知道怎么变化内容了,那现在就开始写些内容好咯:我的逻辑很简单,在基本信息中罗列了如下的信息:

#首先使用platform库下的system()方法获取到当前实例的运行系统内核
    system_kernel = platform.system()
    #写上标题,注意!:这个表题在前端显示的效果为H1,因此在落笔写标题的时候要慎重
    st.title("实例基本信息")
    #使用platform.node()方法获取当当前运行实例的名称
    st.write("设备名称:{}".format(platform.node()))
    # 需要申明:Linux端的返回数据和Windows端的并非相同,因此
    # 针对不同系统内核,你需要单独修改platform和psutil返回值
    # 的格式。本次APP将会以UBuntu系统下的情况为例。
    #检测到当前运行内核为Linux
    if system_kernel == "Linux":
        #使用platform.machine()方法,并截取字符串以获得到当前实例系统的运行位宽
        st.write("系统类型:{}{}位".format(system_kernel,platform.machine()[4:6]))
    #使用platform.platform()方法获取到当前实例下的系统版本
    st.write("系统版本:{}".format(platform.platform()))
    #psutil.cpu_count()方法返回当前实例的物理内核数量
    st.write("CPU:{}核".format(psutil.cpu_count()))
    #因为当前的实例是运行在虚拟机上的,因此
    #platform.processor()方法返回的是x86_64
    #而该方法在实体机下的返回值则是:Intel64 Family 6 Model 158 Stepping 10, GenuineIntel
    #可以看到在实体机下可以看到该实例下处理器的位宽,制造厂商,最大内核数以及步进(Stepping)
    st.write("CPU类型:{}".format(platform.processor()))
    #使用psutil.net_if_stats追踪到当前实例中连接到主网络的设备名称
    st.write("基础网路连接:{}".format(list(psutil.net_if_stats())[1]))
    #使用psutil.net_if_addrs()获取对应设备的IP
    st.write("IP:{}".format(list(psutil.net_if_addrs().values())[1][0][1]))
    #以上两个方法的返回值都是列表,因此与要进行切分

构建好基础信息之后我们需要开始构建网络配置下的内容了

#首先构建存放所有psutil模块下返回值的列表以供存储数据
    network_adpater_name = []
    network_adpater_isup = []
    network_adpater_workmode = []
    network_adpater_runspeed = []
    network_adpater_maximumtrans = []
    network_adpater_ip = []
    network_adpater_netmask = []
    #使用循环遍历出当前实例下所有的网络设备
    for i in range(len(list(psutil.net_if_addrs()))):
        network_adpater_name.append(list(psutil.net_if_addrs())[i])
        network_adpater_isup.append(list(psutil.net_if_stats().values())[i][0])
        #需要声明:psutil模块对应网卡工作模式的定义有些特殊,其返回值为整数数据
        #当返回值为2时,代表当前网络设备处于全双工的工作模式
        #当返回值为1时,代表当前网络设备处于半双工的工作模式
        network_adpater_workmode.append(list(psutil.net_if_stats().values())[i][1])
        network_adpater_runspeed.append(str(list(psutil.net_if_stats().values())[i][2])+"Mbps")
        network_adpater_maximumtrans.append(list(psutil.net_if_stats().values())[i][3])
        network_adpater_ip.append(list(psutil.net_if_addrs().values())[i][0][1])
        network_adpater_netmask.append(list(psutil.net_if_addrs().values())[i][0][2])
    #这里必须要用Pandas构建一个数据框,否则会很难看
    network_adpater_info = pd.DataFrame()
    network_adpater_info["网卡名称"] = network_adpater_name
    network_adpater_info["上行"] = network_adpater_isup
    network_adpater_info["工作模式"] = network_adpater_workmode
    network_adpater_info["运行速率"] = network_adpater_runspeed
    network_adpater_info["最大并行转发量"] = network_adpater_maximumtrans
    network_adpater_info["网卡IP"] = network_adpater_ip
    network_adpater_info["子网掩码"] = network_adpater_netmask
    st.title("当前Django实例网络配置信息")
    #使用StreamlitAPI下的table()方法将DataFrame数据框渲染出来
    st.table(network_adpater_info)

基础性能指标

通过对于基础性能指标参数的获取,这时我们需要引入流的概念。

通过上面对于参数获取并生成表,我们发现这种类型的数据是固定的,或者是对于动态的要求要求几乎微乎其微。

但是对于诸如:占用、速度这样矢量化数据而言。动态就显得尤为重要。

上代码:

#定义监视器函数并返回占用比
    def CPUMonitorWatchDog():
        usage = []
        while True:
            #在这里因为使用from pustil import *的引入方法
            #因此这里的cpu_percent不需要声明属于那个类下的
            #参数:interval=int用于设置psutil模块获取CPU占比的间隔
            #percpu:用于控制psutil模块在获取占用时是否将内核进行拆分
            usage.append(cpu_percent(interval=1, percpu=False))
            return usage
    def MemMonitorWatchDog():
        mem_usage = []
        while True:
            mem_usage.append(virtual_memory().percent)
            return mem_usage
    def readDiskWatchDog():
        read_speed = []
        while True:
            read_speed.append(psutil.disk_io_counters().read_bytes/1024/100)
            return read_speed
    def writeDiskWatchDog():
        write_speed = []
        while True:
            write_speed.append(disk_io_counters().write_bytes/1024/100)
            return write_speed
    def DownLoadNetMonitorWatchDog():
        download_speed = []
        while True:
            #这里net_ip_counters()中所有的返回值都是bit,因此无需转换为Byte
            #以保证
            first_recv = net_io_counters().bytes_recv
            sleep(1)
            second_recv = net_io_counters().bytes_recv
            download_speed.append((second_recv - first_recv))
            return download_speed
    def UPLoadNetMonitorWatchDog():
        upload_speed = []
        while True:
            first_sent = net_io_counters().bytes_sent
            sleep(1)
            second_sent = net_io_counters().bytes_sent
            upload_speed.append((second_sent - first_sent))
            return upload_speed
    def PackageSentMonitor():
        sent_pack = []
        while True:
            sent_pack.append(net_io_counters().packets_sent)
            return sent_pack
    def PackageReceiveMonitor():
        sent_pack = []
        while True:
            sent_pack.append(net_io_counters().packets_sent)
            return sent_pack
    #构建数据框以实时存储监视器们返回的数据
    df = pd.DataFrame(CPUMonitorWatchDog())
    #设置列长更新,以保证长度Y轴长度能够实时更新
    monitor_columns = [f"CPU占用率" for i in range(len(df.columns))]
    df.columns = monitor_columns  
    df["内存占用"] = MemMonitorWatchDog()
    df['读取速度'] = readDiskWatchDog()
    df["写入速度"] = writeDiskWatchDog()
    df["下载速度"] = DownLoadNetMonitorWatchDog()
    df["上传速度"] = UPLoadNetMonitorWatchDog()
    df["发包数量"] = PackageSentMonitor()
    df["接受包数量"] = PackageReceiveMonitor()
    #st.subheader()方法用于生成子标题,对应HTML中的h3标签
    first_header = st.subheader("CPU占用率")
    #使用st.area_chart()生成一个折线面积图的实例
    monitor_chart = st.area_chart(df[monitor_columns])
    second_header = st.subheader("内存占用率")
    mem_usage_charts= st.area_chart(df["内存占用"])
    third_header = st.subheader("磁盘读取速度(KB/s)")
    read_bytes_charts = st.area_chart(df["读取速度"])
    fourth_headers = st.subheader("磁盘写入速度(KB/s)")
    write_bytes_chart = st.area_chart(df['写入速度'])
    fivth_headers = st.subheader("下载速度(kb/s)")
    download_charts = st.area_chart(df["下载速度"])
    sixth_headers = st.subheader("上传速度(kb/s)")
    upload_charts = st.area_chart(df["上传速度"])
    seventh_headers = st.subheader("收包数量(个)")
    packrecv_charts = st.area_chart(df["接受包数量"])
    eighth_headers = st.subheader("发包数量")
    packsent_charts = st.area_chart(df["发包数量"])
    #开始进行数据迭代与填充
    while True:
        #因为每一个chart的类型属于DeletaGenerator对象,在Steamlit的API下
        #可以使用.add_rows()的方法拓宽数据,在参数中声明一个DataFrame和对应列以构建数据流
        monitor_chart.add_rows(pd.DataFrame(CPUMonitorWatchDog(), columns=monitor_columns))
        mem_usage_charts.add_rows(pd.DataFrame(MemMonitorWatchDog(),columns=["内存占用"]))
        read_bytes_charts.add_rows(pd.DataFrame(readDiskWatchDog(),columns=["读取速度"]))
        write_bytes_chart.add_rows(pd.DataFrame(writeDiskWatchDog(), columns=['写入速度']))
        download_charts.add_rows(pd.DataFrame(DownLoadNetMonitorWatchDog(),columns=["下载速度"]))
        upload_charts.add_rows(pd.DataFrame(UPLoadNetMonitorWatchDog(),columns=["上传速度"]))
        packrecv_charts.add_rows(pd.DataFrame(PackageReceiveMonitor(),columns=["接受包数量"]))
        packsent_charts.add_rows(pd.DataFrame(PackageSentMonitor(),columns=["发包数量"]))

构建完最复杂的动态可视化之后,我们尝试来构建存储方面监控与可视化

elif sidebar == "存储监控":
    #使用disk_usage()方法获取到指定盘符下的磁盘容量
    st.header("当前Django实例存储总量:{}".format(round(disk_usage("/").total/1024/1024/1000,2))+"GB")
    #使用disk_partitions()方法获取到当前实例下的所有分区
    st.header("共有{}个分区".format(len(psutil.disk_partitions())))
    total_partitions = []
    each_partion_usage = []
    #通过循环显示出实例下所有的分区,极其名称和占比
    #由于这个APP运行在了虚拟机上,而且虚拟磁盘是被拆分开来的,因此显示时只会获得所有虚拟分区
    for i in range(len(psutil.disk_partitions())):
        st.write("当前{}盘盘符占用:{}%,剩余{}GB".format(psutil.disk_partitions()[i].device,disk_usage(psutil.disk_partitions()[i].device).percent,round(disk_usage(psutil.disk_partitions()[i].device).free/1024/1024/1024),2)+"\n")
        total_partitions.append(psutil.disk_partitions()[i].device)
        each_partion_usage.append(disk_usage(psutil.disk_partitions()[i].device).percent)
    #创建一个plotly离线对象用于绘制分区占比
    pyplt = py.offline.plot
    labels = total_partitions
    values = each_partion_usage
    trace = [go.Pie(
    labels = labels, 
    values = values, 
    hole =  0.7,
    hoverinfo = "label + percent")]
    layout = go.Layout(
        title = "当前实例硬盘分区占比"
    )
    fig = go.Figure(data=trace,layout=layout)
    #调用streamlitAPI下的plotly_chart将之前plotly生成的figure写入到streamlit中
    st.plotly_chart(figure_or_data=fig,use_container_width=True)
    #捕获读写IOPs
    def readIOPsWatchDog():
        read_iops = []
        while True:
            read_iops.append(psutil.disk_io_counters().read_count)
            return read_iops
    def writeIPOsWatchDog():
        write_iops = []
        while True:
            write_iops.append(psutil.disk_io_counters().write_count)
            return write_iops
    df = pd.DataFrame(readIOPsWatchDog())
    readiops_columns = [f"顺序随机读取性能(IOPs)" for i in range(len(df.columns))]
    df.columns = readiops_columns 
    df["顺序随机写入性能(IOPs)"] = writeIPOsWatchDog()
    first_header = st.subheader("顺序随机读取性能(IOPs)")
    readiops_charts= st.area_chart(df["顺序随机读取性能(IOPs)"])
    second_header = st.subheader("顺序随机写入性能(IOPs)")
    writeiops_charts = st.area_chart(df["顺序随机写入性能(IOPs)"])
    while True:
        readiops_charts.add_rows(pd.DataFrame(readIOPsWatchDog(),columns=["顺序随机读取性能(IOPs)"]))
        writeiops_charts.add_rows(pd.DataFrame(writeIPOsWatchDog(),columns=["顺序随机写入性能(IOPs)"]))

接下来来到弹性用户组管理与配置

需要申明一点,这里为了方便查看用户组下的所有有效信息,建议在构建APP之前,记得给root配置免密登陆,否则每一次查看用户组时都需要切回Streamlit的控制台去输入root的密码,十分麻烦

#先检测一下系统内核
    system_kernel = platform.system()
    st.title("当前实例用户组配置信息")
    #这里设置一个用beta_columns()方法生成4个DeltaGenerator类的列对象
    col1,col2,col3,col4 = st.beta_columns(4)
    #针对每一个独立的列绑定一个按钮
    add_group_button = col1.button("添加用户组")
    delete_group_button = col2.button("删除用户组")
    add_user_button = col3.button("添加用户")
    delete_user_button = col4.button("删除用户")
    #当系统内核检测为Linux时则执行如下逻辑
    if system_kernel == "Linux":
        #读取/etc/group下的分组信息
        process = os.popen("sudo cat /etc/group").readlines()
        #创建四个列表用于存储分列数据
        group_name = []
        group_key = []
        gid = []
        user_list = []
        #开始分列
        for line in process:
            group_name.append(line.replace("\n","").split(":")[0])
            group_key.append(line.replace("\n","").split(":")[1])
            gid.append(line.replace("\n","").split(":")[2])
            user_list.append(line.replace("\n","").split(":")[3])
        #创建数据框
        df = pd.DataFrame()
        df["组名称"] = group_name
        df["组密钥"] = group_key
        df["GID"] = gid
        df["用户群"] = user_list
        st.table(df)
        #当按钮被按下时
        if add_group_button:
            #使用text_input()方法创建一个输入框用于记录添加的用户组名
            add_confirm = st.text_input("请输入你想添加的用户组")
            #当前长度不等于0时
            if len(add_confirm) != 0:
                #如果添加的
                if add_confirm.isin(df["组名称"]):
                    st.error("该用户组已存在,无法再次添加")
                else:
                    os.popen("sudo groupadd {}".format(add_confirm))
                    st.success("添加成功")
        if delete_group_button:
            delete_confirm = st.text_input("请确认你删除的用户组对象名称")
            if len(delete_confirm) != 0:
                #生成一个警告信息
                st.warning("该操作会影响实例系统下的用户组信息,请输入:我已知晓后果,并承担风险方可执行该操作!")
                #进行最终确认以防止误删
                final_confirm = st.text_input(label="最终确认")
                if final_confirm == "我已知晓后果,并承担风险方可执行该操作":
                    if delete_confirm in df["组名称"]:
                        os.popen("sudo groupdel {}".format(delete_confirm))
                        st.success("删除成功")
                    else:
                        st.error("请确认该用户组是否存在")
        if add_user_button:
            add_user_confirm = st.text_input("请输入你想添加的用户")
            if len(add_user_confirm) != 0:
                all_user = []
                for i in range(len(df["用户群"])):
                    single_group = df["用户群"][i].split(",")
                    for i in range(len(single_group)):
                        all_user.append(single_group[i])
                if add_user_confirm in all_user:
                    st.error("无法添加!目标用户已存在!")
                else:
                    os.popen("sduo adduser {}")
        if delete_user_button:
            delete_user_confirm = st.text_input("请输入你想删除的用户")
            if len(delete_user_confirm) != 0:
                st.warning("该操作会影响实例系统下的用户组信息,请输入:我已知晓后果,并承担风险方可执行该操作!")
                final_confirm = st.text_input(label="最终确认")
                if final_confirm == "我已知晓后果,并承担风险方可执行该操作":
                    if delete_user_confirm in all_user:
                        os.popen("sudo userdel --remove {}".format(delete_user_confirm))
                        st.success("删除成功")
                    else:
                        st.error("请确认该用户组是否存在")

配置弹性安全组选项

elif sidebar == "弹性安全组":
    #需额外声明,因终端输出值为一个极其不规则的值,因此无法实现十分规整的展示
    #因此如果想查看完整的安全组信息需要仍然在shell内执行
    with st.beta_expander("额外声明!"):
        st.write("如欲获得完整组策略信息,请使用命令终端或服务器Shell端进行查询")
    #设置两个分列
    col1,col2 = st.beta_columns(2)
    #绑定按钮
    add_rule_button = col1.button("添加规则")
    del_rule_button = col2.button("删除规则")
    if add_rule_button:
        col1.subheader("添加规则配置")
        rule_type = st.text_input("规则类型")
        port_rule = st.text_input("端口规则")
        add_button = st.button("确认添加")
        if add_button:
            os.popen("sudo ufw {} {}".format(rule_type,port_rule))
    if del_rule_button:
        col2.subheader("删除规则配置")
        port_rule = col2.text_input("端口规则")
        del_button = col2.button("确认删除")
        if del_button:
            os.popen("sudo ufw delete {}".format(port_rule))

前面有提到过我们导入了PyCuda包用于检测实例下存在的CUDA GPU数量和工作状态,那么我们开始最后一步:检测CUDA GPU

elif sidebar == "GPU管理":
    st.title("GPU技术由NVIDIA提供",st.image("https://developer.nvidia.com/sites/all/themes/devzone_new/favicon.ico"))
    #用pycuda.driver()方法初始化pycuda实例
    drv.init()
    #当driver检测设备时
    if drv.Device.count() != 0:
        #显示当前实例装载了多少个CUDA GPU
        st.header("当前实例装载了{}个CUDA GPU.".format(drv.Device.count()))
        #构建CUDA设备信息存放数据的列表
        cuda_device_name = []
        cuda_device_capability = []
        cuda_device_memory = []
        cuda_device_rundriver = []
        #循环输出CUDA信息并填充至之前构建的列表中
        for i in range(drv.Device.count()):
            device = drv.Device(i)
            cuda_device_name.append(device.name())
            cuda_device_capability.append(float("%d.%d" % device.compute_capability()))
            gpu_memory = device.total_memory()//(1024**2)//1024
            cuda_device_memory.append(gpu_memory)
            cuda_device_rundriver.append(drv.get_version()[0])
        #构建数据框
        df = pd.DataFrame()
        df["CUDA设备名称"] = cuda_device_name
        df["CUDA设备计算能力"] = cuda_device_capability
        df["CUDA设备内存"] = cuda_device_memory
        df["CUDA驱动版本"] = cuda_device_rundriver
        #使用table()方法显示出来
        st.table(df)
    else:
        st.error("当前Django服务器未装载GPU!")

好了,综上我们所有的功能都构建好了,之后就是启动我们的Streamlit APP了,启动方法也非常简单:使用cd命令切换到你的APP目录下,并输入

streamlit run xxx.py

即可顺利的运行啦,当然之前说streamlit的一大优势在于共享性,这也就意味着在你启动streamlit app时不仅有一个本地的localhost地址,同时也会有一个局域网地址。当你的服务器上顺利部署了streamlit的APP时,凡是与服务器处于同一局域网内的设备都可以访问到这个APP了。

最后如果喜欢本期分享的话,记得动动你可爱的小手,点赞、打赏、转发一键三联哦,我是Deaohst,我们下期分享见。

附上码云链接:https://gitee.com/P4r4I10w_W0nd3r/elins-code-storage/tree/myDjango/Streamlit

物联沃分享整理
物联沃-IOTWORD物联网 » 使用Python搭建自动化运维平台并编写运维脚本

发表评论