Example Python Bot

Bot

This is just an example bot. Basically you can call any command which works on the same system (call webhooks via curl, run ansible playbooks, conquer the world, …).

#include "std_disclaimer.h"
/*
 * Your warranty is now void.
 *
 * I am not responsible for hacked machines, crashed containers,
 * thermonuclear war, or you getting fired because the coffe machine bot hook failed.
 * Please regard this as a proof of concept.
 */

Setup

WORK IN PROGRESS

I’ve prepared a virtualenv to run a python bot as the same user which runs the signal REST api. I also wanted to run a non-blocking bot and as i’m not yet familiar enough with async programming, i’m using python-rq to take care of the jobs.

Example workflow:

sequenceDiagram Signal CLI socket listener->>Redis:Schedule function to redis queue Worker-->Redis:Fetch and run job Worker->>Signal CLI REST API:Send message
apt-get -y install redis-server

Install the necessary python packages:

sudo -u signal-cli -i bash -c '"${HOME}"/venv/bin/pip install requests rq'

Listener

/var/lib/signal-cli/signal-cli-jsonrpc-bot/listener.py

import re
import socket
from json import dumps, loads
from time import sleep
from uuid import uuid4

import handler
from rq import Queue
from worker import connection

def hello(*args, **kwargs):
    job = queue.enqueue(handler.send_message, "Hello, this is your chatbot, how can i help you?", kwargs['sender'])


def cat(*args, **kwargs):
    job = queue.enqueue(handler.cat, kwargs.get('sender'))


regex_handlers = [
    (r'/hello', hello),
    (r'/cat', cat),
    ]

def message_handler(data):
    if not data.get("method") == "receive":
        return
    envelope = data.get("params", {}).get("envelope", {})
    if not envelope:
        return
    timestamp = envelope.get("timestamp")
    source = envelope.get("source")
    message = envelope.get("dataMessage", {}).get("message")
    if not message:
        return
    group_id = envelope.get("dataMessage", {}).get("groupInfo", {}).get("groupId")
    sender = None
    if group_id:
        sender = group_id
    if source:
        account = data.get("params").get("account")
        res = jsonrpc(
            data={
                "method": "listContacts",
                "params": {
                    "account": account,
                },
            },
        )
        if source not in list(map(lambda d: d["number"], res.get("result"))):
            return
        if not sender:
            sender = source
    for regex, function in regex_handlers:
        if re.search(regex, message, re.IGNORECASE):
            function(
                message=message,
                sender=sender,
                timestamp=timestamp,
            )
            break
    return


def jsonrpc(host: str = "localhost", port: int = 7583, data: dict = {}, **kwargs):
    if data:
        request_id = str(uuid4())
        data.update({"jsonrpc": "2.0", "id": request_id})
    recv_buffer = []
    sock_type = socket.AF_INET
    sock_conn = (host, port)
    if host.startswith("unix://"):
        sock_type = socket.AF_UNIX
        sock_conn = host.replace("unix://", "")
    with socket.socket(sock_type, socket.SOCK_STREAM) as sock:
        try:
            sock.connect(sock_conn)
            if data:
                sock.settimeout(10)
                sock.sendall(dumps(data).encode("utf-8"))
                sock.shutdown(socket.SHUT_WR)
            while True:
                chunk = sock.recv(1)
                recv_buffer.append(chunk)
                if chunk == b"\n":
                    res = loads(b"".join(recv_buffer).decode("utf-8"))
                    recv_buffer = []
                    if data:
                        return res
                    message_handler(res)
        except ConnectionRefusedError:
            print(f"Connection refused ({sock_conn})")
            attempt = kwargs.get("attempt", 0)
            retries = kwargs.get("retries", 3)
            if attempt > retries:
                print("Reached max number of retries, exiting")
                exit(1)
            print(f"Trying to reconnect ({attempt}/{retries})")
            sleep(2)
            jsonrpc(attempt=attempt + 1)
        except Exception as err:
            error = getattr(err, "message", repr(err))
            print(error)
            raise

queue = Queue(connection=connection)
jsonrpc()

SystemD unit

/etc/systemd/system/signal-cli-jsonrpc-bot-listener.service

[Unit]
Description=signal-cli Bot
After=multi-user.target signal-cli.service redis.service
Requires=signal-cli.service redis.service

[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-jsonrpc-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 listener.py
Restart=on-failure
RestartSec=20

[Install]
WantedBy=multi-user.target

Enable and start the listener:

systemctl enable --now signal-cli-jsonrpc-bot-listener.service

Worker

Code

/var/lib/signal-cli/signal-cli-jsonrpc-bot/worker.py

import os
import re
import redis
from rq import Worker, Queue, Connection

listen = ['default']

redis_url = os.getenv('REDIS', 'redis://localhost:6379')

connection = redis.from_url(redis_url)

if __name__ == '__main__':
    with Connection(connection):
        worker = Worker(list(map(Queue, listen)))
        worker.work()

SystemD unit

/etc/systemd/system/signal-cli-jsonrpc-bot-worker.service

[Unit]
Description=Signal Bot Worker
After=multi-user.target redis.service
Requires=redis.service

[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-jsonrpc-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 worker.py
Restart=on-failure
RestartSec=20

[Install]
WantedBy=multi-user.target

Enable and start the worker:

systemctl enable --now signal-cli-jsonrpc-bot-worker.service

Handler

Contains the functions shared between worker and listener.

/var/lib/signal-cli/signal-cli-jsonrpc-bot/handler.py

import uuid
import requests
import os
from rq import Queue
from worker import connection
from base64 import b64encode

q = Queue(connection=connection)


def requests_get(url, data=None, user=None, password=None, filename=None, stream=False):
    result = requests.get(
        url, auth=(user, password), data=data, stream=stream, verify=False
    )
    if filename is not None:
        if result.status_code == 200:
            with open(filename, "wb") as f:
                f.write(result.content)
                return True
    return False


def requests_post(
    url, data=None, user=None, password=None, files=None, json=None, stream=False
):
    requests.post(
        url, auth=(user, password), json=json, data=data, files=files, stream=stream
    )


def cleanup_attachments(filename):
    return os.remove(filename)


def send_message(message, recipient, filename=None):
    data = {
        "message": message,
        "recipients": [recipient],
    }
    try:
        if isinstance(recipient, list):
            recipient = b64encode(bytearray(recipient)).decode()
        if filename:
            with open(filename, "rb") as f_h:
                data.update({"base64_attachments": [b64encode(f_h.read()).decode()]})
        requests_post(f"http://localhost:8080/v2/send", json=data)
    except Exception as err:
        print(err)
    finally:
        if filename:
            q.enqueue(cleanup_attachments, filename)


def cat(sender):
    filename = str(uuid.uuid4())
    result = requests_get(
        "http://thecatapi.com/api/images/get?format=src&type=gif",
        None,
        None,
        None,
        filename,
        True,
    )
    if result:
        q.enqueue(
            send_message("Here's your random " + "\U0001F63B", sender, filename)
        )