跳到主要内容

Python 告警代理服务器

· 阅读需 2 分钟

安装python 环境

# 安装
yum install -y epel-release
yum install -y python3
yum install -y python3-pip

# 验证
python3 --version
pip3 --version

# 使用pip3来安装 requests 和 flask 库:
pip3 install requests flask

编写代理服务器

main.py

import json
import requests
from flask import Flask, request

app = Flask(__name__)

# 配置自己的群机器人的 key
WECHAT_API_URL = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=your_key"

def convert_to_wechat_markdown(data):
status = data['status']
group_labels = data.get('groupLabels', {})
common_labels = data.get('commonLabels', {})
alerts = data.get('alerts', [])

# Modify the alert title based on the status
alert_title = f"{status.upper()}告警" if status == 'firing' else "告警已解决"

content = f"# {alert_title}\n"
content += f"> 告警名称: {group_labels.get('alertname')}\n"
content += f"> 严重程度: {common_labels.get('severity')}\n\n"

for alert in alerts:
annotations = alert.get('annotations', {})
starts_at = alert.get('startsAt', '') # 获取故障时间
content += f"#### 告警描述\n> {annotations.get('description')}\n"
content += f"> 故障时间: {starts_at}\n" # 添加故障时间

content += f"\n[面板地址](http://dashboard.grafana.com)\n" # 添加面板地址

return {
'msgtype': 'markdown',
'markdown': {'content': content}
}

@app.route('/proxy', methods=['POST'])
def proxy():
data = request.json
wechat_data = convert_to_wechat_markdown(data)
headers = {
'Content-Type': 'application/json'
}
response = requests.post(WECHAT_API_URL, headers=headers, data=json.dumps(wechat_data))
return response.text, response.status_code

if __name__ == "__main__":
app.run(host="0.0.0.0", port=8000)

启动服务器

nohup python3 main.py &

手动测试

curl -X POST http://localhost:8000/proxy -H "Content-Type: application/json" -d '{
"status": "firing",
"groupLabels": {"alertname": "TestAlert"},
"commonLabels": {
"cluster": "TestCluster",
"service": "TestService",
"severity": "critical"
},
"alerts": [
{
"annotations": {
"description": "This is a test alert."
}
}
]
}'

配置 Alertmanager

global:
resolve_timeout: 5m

route:
group_by: ['alertname']
group_wait: 30s # 初次发送告警延时
group_interval: 5m # 距离第一次发送告警,等待多久再次发送告警
repeat_interval: 24h # 告警重发时间
receiver: 'default-receiver'
routes:
- match:
severity: critical
receiver: 'weixin-receiver'

receivers:
- name: 'default-receiver'
- name: 'weixin-receiver'
webhook_configs:
- url: 'http://127.0.0.1:8000/proxy'
send_resolved: true

自建 OpenAI 镜像站 Demo

· 阅读需 3 分钟

自建 OpenAI 镜像站 Demo

testopenai.py

# coding: utf-8

import openai

# openai.log = "debug"
openai.api_key = "sk-"
openai.api_base = "https://api_gpt4.ai-gaochao.com/v1"


DEBUG = False

# 非流式响应
# completion = openai.ChatCompletion.create(model="gpt-3.5-turbo", messages=[{"role": "user", "content": "Hello world!"}])
# print(completion.choices[0].message.content)

def gpt_api_stream(messages: list):
"""为提供的对话消息创建新的回答 (流式传输)

gpt4长问题需要用流式传输,不然容易报错
Args:
messages (list): 完整的对话消息
api_key (str): OpenAI API 密钥

Returns:
tuple: (results, error_desc)
"""
try:
response = openai.ChatCompletion.create(
model='gpt-4',
# model="gpt-3.5-turbo",
messages=messages,
stream=True,
# temperature=0.5,
# presence_penalty=0,

)
completion = {'role': '', 'content': ''}
for event in response:
if event['choices'][0]['finish_reason'] == 'stop':
if DEBUG:
print(f'收到的完成数据: {completion}')
break
for delta_k, delta_v in event['choices'][0]['delta'].items():
if DEBUG:
print(f'流响应数据: {delta_k} = {delta_v}')
completion[delta_k] += delta_v
messages.append(completion) # 直接在传入参数 messages 中追加消息
msg = completion['content'] # 解析返回的msg
return (True, msg)
except Exception as err:
return (False, f'OpenAI API 异常: {err}')

def gpt_api_no_stream(messages: list):

try:
completion = openai.ChatCompletion.create(
# model="gpt-3.5-turbo",
# model="gpt-4",
model="gpt-4-0613",
messages=messages
)
# print(completion)

msg = None
choices = completion.get('choices', None)
if choices:
msg = choices[0]['message']['content']
else:
msg = completion['message']['content']
return (True, msg)
except Exception as err:
return (False, f'OpenAI API 异常: {err}')


if __name__ == '__main__':

# 一次性回答版
prompt = 'There are 9 birds in the tree, the hunter shoots one, how many birds are left in the tree?' # 短问题
prompt = "鲁迅为什么暴打周树人"
# prompt = "假如你是一个电力行业专家,请回答下面的问题:变压器有哪些部分组成" # 长问题。可以把DEBUG变量设置为True来看流传输信息
print("发送:", prompt)
print("使用的apikey:", openai.api_key)
messages = [{'role': 'user','content': prompt},]
ret, msg = gpt_api_stream(messages)
# ret, msg = gpt_api_no_stream(messages)
print("返回:", msg)
if DEBUG:
print("messages:", messages)



# # 问答版
# messages = []
# while True:
# prompt = input("Q: ")
# messages.append({'role': 'user','content': prompt})
# ret, msg = gpt_api_stream(messages)
# print("A:", msg)

selectmoney.py

import requests
from datetime import datetime, timedelta

def check_billing(api_key, api_url):
# 计算起始日期和结束日期
now = datetime.now()
start_date = now - timedelta(days=90)
end_date = now + timedelta(days=1)
sub_date = now.replace(day=1)

# 设置API请求URL和请求头
url_subscription = f"{api_url}/v1/dashboard/billing/subscription"
url_balance = f"{api_url}/dashboard/billing/credit_grants"
url_usage = f"{api_url}/v1/dashboard/billing/usage?start_date={start_date.strftime('%Y-%m-%d')}&end_date={end_date.strftime('%Y-%m-%d')}"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}

try:
# 获取API限额
response = requests.get(url_subscription, headers=headers)
if response.status_code != 200:
print("查询返回异常。可能是:\n1. apikey没有余额了\n2. apikey无效\n3. apikey的base_url填写错了")
return

subscription_data = response.json()
total_amount = subscription_data['hard_limit_usd']

# 判断总用量是否大于20,若大于则更新start_date为sub_date
if total_amount > 20:
start_date = sub_date

# 重新生成url_usage
url_usage = f"{api_url}/v1/dashboard/billing/usage?start_date={start_date.strftime('%Y-%m-%d')}&end_date={end_date.strftime('%Y-%m-%d')}"

# 获取已使用量
response = requests.get(url_usage, headers=headers)
usage_data = response.json()
total_usage = usage_data['total_usage'] / 100

# 计算剩余额度
remaining = total_amount - total_usage

# 输出总用量、总额及余额信息
print(f"Total Amount: {total_amount:.2f}")
print(f"Used: {total_usage:.2f}")
print(f"Remaining: {remaining:.2f}")

return [total_amount, total_usage, remaining]
except Exception as error:
print(error)
return [None, None, None]

# 使用示例
api_key = "sk-"
api_base = "https://api_gpt4.ai-gaochao.com"

result = check_billing(api_key, api_base)

AI 综合站使用

· 阅读需 1 分钟

ChatGPT 等聊天AI使用

1、Chatgpt 等模型

2、聊天对话

3、提供搜索插件,使用GPT-4.0-Plus模型

4、根据 AI 应用来更好回答问题

Midjourney 等绘画AI使用

AI 音乐生成