Example Python Bot

Bot

This is just an example bot. Basically you can call any command which works on the same system (call webhooks via curl, run ansible playbooks, conquer the world, …).

#include "std_disclaimer.h"
/*
 * Your warranty is now void.
 *
 * I am not responsible for hacked machines, crashed containers,
 * thermonuclear war, or you getting fired because the coffe machine bot hook failed.
 * Please regard this as a proof of concept.
 */

Setup

WORK IN PROGRESS

I’ve prepared a virtualenv to run a python bot as the same user which runs the signal gateway. I also wanted to run a non-blocking bot and as i’m not yet familiar enough with async programming, i’m using python-rq to take care of the jobs.

Example workflow:

sequenceDiagram Signal DBus Listener->>Redis:Schedule function to redis queue Worker-->Redis:Fetch and run job Worker->>Signal DBus REST API:Send message
apt-get -y install redis-server

Install the necessary python packages:

sudo -u signal-cli -i bash -c '"${HOME}"/venv/bin/pip install requests rq'

Listener

/var/lib/signal-gateway/signal-cli-dbus-bot/listener.py

def hello(*args, **kwargs):
    job = queue.enqueue(handler.send_message, "Hello, this is your chatbot, how can i help you?", kwargs['sender'])


def cat(*args, **kwargs):
    job = queue.enqueue(handler.cat, kwargs.get('sender'))


regex_handlers = [
    (r'/hello', hello),
    (r'/cat', cat),
    ]


def message_handler(timestamp, source, group_id, message, attachments):
    sender = None
    if group_id:
        sender = group_id
    if source:
        if source not in signal.listNumbers():
            return
        if not sender:
            sender = source
    for regex, function in regex_handlers:
        if re.search(regex, message, re.IGNORECASE):
            function(message=message, sender=sender)
            break
    return

from pydbus import SystemBus
from gi.repository import GLib
import handler
from rq import Queue
from rq.job import Job
from worker import connection
import re

bus = SystemBus()
loop = GLib.MainLoop()
signal = bus.get("org.asamk.Signal", "/org/asamk/Signal/ACCOUNT") 
queue = Queue(connection=connection)
signal.onMessageReceived = message_handler
loop.run()

SystemD unit

/etc/systemd/system/signal-cli-dbus-bot-listener.service

[Unit]
Description=signal-cli DBus Bot
After=multi-user.target signal-cli.service redis.service
Requires=signal-cli.service redis.service

[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-dbus-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 listener.py
Restart=on-failure
RestartSec=20

[Install]
WantedBy=multi-user.target

Enable and start the listener:

systemctl enable --now signal-cli-dbus-bot-listener.service

Worker

Code

/var/lib/signal-cli/signal-cli-dbus-bot/worker.py

import os
import re
import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDIS', 'redis://localhost:6379')

connection = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(connection):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

SystemD unit

/etc/systemd/system/signal-cli-dbus-bot-worker.service

[Unit]
Description=Signal Bot Worker
After=multi-user.target redis.service
Requires=redis.service

[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-dbus-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 worker.py
Restart=on-failure
RestartSec=20

[Install]
WantedBy=multi-user.target

Enable and start the worker:

systemctl enable --now signal-cli-dbus-bot-worker.service

Handler

Contains the functions shared between worker and listener.

/var/lib/signal-cli/signal-cli-dbus-bot/handler.py

import uuid
import requests
import os
from rq import Queue
from worker import connection
from base64 import b64encode

q = Queue(connection=connection)


def requests_get(url, data=None, user=None, password=None, filename=None, stream=False):
    result = requests.get(
        url, auth=(user, password), data=data, stream=stream, verify=False
    )
    if filename is not None:
        if result.status_code == 200:
            with open(filename, "wb") as f:
                f.write(result.content)
                return True
    return False


def requests_post(
    url, data=None, user=None, password=None, files=None, json=None, stream=False
):
    requests.post(
        url, auth=(user, password), json=json, data=data, files=files, stream=stream
    )


def cleanup_attachments(filename):
    return os.remove(filename)


def send_message(message, recipient, filename=None):
    data = {
        "message": message,
        "recipients": [recipient],
    }
    try:
        if isinstance(recipient, list):
            recipient = b64encode(bytearray(recipient)).decode()
        if filename:
            with open(filename, "rb") as f_h:
                data.update({"base64_attachments": [b64encode(f_h.read()).decode()]})
        requests_post(f"http://localhost:8080/v2/send", json=data)
    except Exception as err:
        print(err)
    finally:
        if filename:
            q.enqueue(cleanup_attachments, filename)


def cat(sender):
    filename = str(uuid.uuid4())
    result = requests_get(
        "http://thecatapi.com/api/images/get?format=src&type=gif",
        None,
        None,
        None,
        filename,
        True,
    )
    if result:
        q.enqueue(
            send_message("Here's your random " + "\U0001F63B", sender, filename)
        )