grafana-webhook-to-matrix/grafana_webhook_to_matrix.py
Rune Juhl Jacobsen 3901791a6e
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
debug
2023-02-19 22:20:32 +01:00

57 lines
1.4 KiB
Python

import aiohttp
from aiohttp import web
from datetime import datetime
import os
homeserver = os.environ['MATRIX_HOMESERVER']
access_token = os.environ['MATRIX_ACCESS_TOKEN']
room = os.environ['MATRIX_ROOM']
_last_timestamp = 0
_counter = 0
def unique_number():
global _last_timestamp
global _counter
timestamp = int(datetime.utcnow().timestamp())
if timestamp == _last_timestamp:
_counter += 1
else:
_counter = 0
_last_timestamp = timestamp
return timestamp * 100 + _counter
async def send_message(text):
async with aiohttp.ClientSession() as http:
txid = unique_number()
res = await http.put(
f'https://{homeserver}/_matrix/client/v3/rooms/{room}'
+ f'/send/m.room.message/{txid}',
headers={'Authorization': f'Bearer {access_token}'},
json={
'msgtype': 'm.text',
'body': text,
},
)
res.raise_for_status()
async def handle_alert(request):
body = await request.json()
title = body['title']
status = body['status']
print(body)
await send_message(f'[{status}] {title}')
return web.Response(text='Ok')
app = web.Application()
app.add_routes([
web.post('/alert', handle_alert)
])
if __name__ == '__main__':
print('grafana_webhook_to_matrix.py starting')
web.run_app(app, host='0.0.0.0', port=8003)