This is just an example bot. Basically you can call any command which works on the same system (call webhooks via curl, run ansible playbooks, conquer the world, …).
#include "std_disclaimer.h"
/*
* Your warranty is now void.
*
* I am not responsible for hacked machines, crashed containers,
* thermonuclear war, or you getting fired because the coffe machine bot hook failed.
* Please regard this as a proof of concept.
*/
WORK IN PROGRESS
I’ve prepared a virtualenv to run a python bot as the same user which runs the signal gateway. I also wanted to run a non-blocking bot and as i’m not yet familiar enough with async programming, i’m using python-rq to take care of the jobs.
Example workflow:
apt-get -y install redis-server
Install the necessary python packages:
sudo -u signal-cli -i bash -c '"${HOME}"/venv/bin/pip install requests rq'
/var/lib/signal-gateway/signal-cli-dbus-bot/listener.py
def hello(*args, **kwargs):
job = queue.enqueue(handler.send_message, "Hello, this is your chatbot, how can i help you?", kwargs['sender'])
def cat(*args, **kwargs):
job = queue.enqueue(handler.cat, kwargs.get('sender'))
regex_handlers = [
(r'/hello', hello),
(r'/cat', cat),
]
def message_handler(timestamp, source, group_id, message, attachments):
sender = None
if group_id:
sender = group_id
if source:
if source not in signal.listNumbers():
return
if not sender:
sender = source
for regex, function in regex_handlers:
if re.search(regex, message, re.IGNORECASE):
function(message=message, sender=sender)
break
return
from pydbus import SystemBus
from gi.repository import GLib
import handler
from rq import Queue
from rq.job import Job
from worker import connection
import re
bus = SystemBus()
loop = GLib.MainLoop()
signal = bus.get("org.asamk.Signal", "/org/asamk/Signal/ACCOUNT")
queue = Queue(connection=connection)
signal.onMessageReceived = message_handler
loop.run()
/etc/systemd/system/signal-cli-dbus-bot-listener.service
[Unit]
Description=signal-cli DBus Bot
After=multi-user.target signal-cli.service redis.service
Requires=signal-cli.service redis.service
[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-dbus-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 listener.py
Restart=on-failure
RestartSec=20
[Install]
WantedBy=multi-user.target
Enable and start the listener:
systemctl enable --now signal-cli-dbus-bot-listener.service
/var/lib/signal-cli/signal-cli-dbus-bot/worker.py
import os
import re
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDIS', 'redis://localhost:6379')
connection = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(connection):
worker = Worker(list(map(Queue, listen)))
worker.work()
/etc/systemd/system/signal-cli-dbus-bot-worker.service
[Unit]
Description=Signal Bot Worker
After=multi-user.target redis.service
Requires=redis.service
[Service]
Type=idle
User=signal-cli
Group=signal-cli
Environment=REQUESTS_CA_BUNDLE=/etc/ssl/certs/ca-certificates.crt
WorkingDirectory=/var/lib/signal-cli/signal-cli-dbus-bot
ExecStart=/var/lib/signal-cli/venv/bin/python3 worker.py
Restart=on-failure
RestartSec=20
[Install]
WantedBy=multi-user.target
Enable and start the worker:
systemctl enable --now signal-cli-dbus-bot-worker.service
Contains the functions shared between worker and listener.
/var/lib/signal-cli/signal-cli-dbus-bot/handler.py
import uuid
import requests
import os
from rq import Queue
from worker import connection
from base64 import b64encode
q = Queue(connection=connection)
def requests_get(url, data=None, user=None, password=None, filename=None, stream=False):
result = requests.get(
url, auth=(user, password), data=data, stream=stream, verify=False
)
if filename is not None:
if result.status_code == 200:
with open(filename, "wb") as f:
f.write(result.content)
return True
return False
def requests_post(
url, data=None, user=None, password=None, files=None, json=None, stream=False
):
requests.post(
url, auth=(user, password), json=json, data=data, files=files, stream=stream
)
def cleanup_attachments(filename):
return os.remove(filename)
def send_message(message, recipient, filename=None):
data = {
"message": message,
"recipients": [recipient],
}
try:
if isinstance(recipient, list):
recipient = b64encode(bytearray(recipient)).decode()
if filename:
with open(filename, "rb") as f_h:
data.update({"base64_attachments": [b64encode(f_h.read()).decode()]})
requests_post(f"http://localhost:8080/v2/send", json=data)
except Exception as err:
print(err)
finally:
if filename:
q.enqueue(cleanup_attachments, filename)
def cat(sender):
filename = str(uuid.uuid4())
result = requests_get(
"http://thecatapi.com/api/images/get?format=src&type=gif",
None,
None,
None,
filename,
True,
)
if result:
q.enqueue(
send_message("Here's your random " + "\U0001F63B", sender, filename)
)