osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Decorators with arguments?


Christopher de Vidal wrote:

> Help please? Creating an MQTT-to-Firestore bridge and I know a decorator
> would help but I'm stumped how to create one. I've used decorators before
> but not with arguments.
> 
> The Firestore collection.on_snapshot() method invokes a callback and sends
> it three parameters (collection_snapshot, changes, and read_time). I need
> the callback to also know the name of the collection so that I can publish
> to the equivalent MQTT topic name. I had thought to add a fourth parameter
> and I believe a decorator is the right approach but am stumped how to add
> that fourth parameter. How would I do this with the code below?
> 
> #!/usr/bin/env python3
> from google.cloud import firestore
> import firebase_admin
> from firebase_admin import credentials
> import json
> import mqtt
> 
> 
firebase_admin.initialize_app(credentials.Certificate("certs/firebase.json"))
> db = firestore.Client()
> mqtt.connect()
> 
> 
> def load_json(contents):
>     try:
>         return json.loads(contents)
>     except (json.decoder.JSONDecodeError, TypeError):
>         return contents
> 
> 
> def on_snapshot(col_name, col_snapshot, changes, read_time):
>     data = dict()
>     for doc in col_snapshot:
>         serial = doc.id
>         contents = load_json(doc.to_dict()['value'])
>         data[serial] = contents
>     for change in changes:
>         serial = change.document.id
>         mqtt_topic = col_name + '/' + serial
>         contents = data[serial]
>         if change.type.name in ['ADDED', 'MODIFIED']:
>             mqtt.publish(mqtt_topic, contents)
>         elif change.type.name == 'REMOVED':
>             mqtt.publish(mqtt_topic, None)
> 
> 
> # Start repeated code section
> # TODO Better to use decorators but I was stumped on how to pass arguments
> def door_status_on_snapshot(col_snapshot, changes, read_time):
>     on_snapshot('door_status', col_snapshot, changes, read_time)
> 
> 
> door_status_col_ref = db.collection('door_status')
> door_status_col_watch =
> door_status_col_ref.on_snapshot(door_status_on_snapshot)
> 
> # Repetition...
> def cpu_temp_on_snapshot(col_snapshot, changes, read_time):
>     on_snapshot('cpu_temp', col_snapshot, changes, read_time)
> 
> 
> cpu_temp_col_ref = db.collection('cpu_temp')
> cpu_temp_col_watch = cpu_temp_col_ref.on_snapshot(cpu_temp_on_snapshot)
> # End repeated code section
> 
> # Start repeated code section
> door_status_col_watch.unsubscribe()
> cpu_temp_col_watch.unsubscribe()
> # Repetition...
> # End repeated code section
> 
> Christopher de Vidal

You might also consider a contextmanager:

https://docs.python.org/3/library/contextlib.html

# untested

@contextmanager
def subscribe(name, col_snapshot, changes, read_time):
    def status_on_snapshot(col_snapshot, changes, read_time):
        on_snapshot(name, col_snapshot, changes, read_time)

    status_col_ref = db.collection(name)
    status_col_watch = status_col_ref.on_snapshot(door_status_on_snapshot)
    try:
        yield status_col_ref
    finally:
        status_col_watch.unsubscribe()


with subscribe("door_status", ...) as door_status_col_ref:
    with subscribe("cpu_temp", ...) as cpu_temp_col_ref:
        ...


If there are many uniform ones the nested with statements can be 
generalized:

NAMES = "door_status", "cpu_temp", ...
with ExitStack() as stack:
    col_refs = [
        stack.enter_context(subscribe(name)) for name in NAMES
    ]

And if you like Camoron's suggestion or the subscribe() generator above just 
gets too unwieldy: a custom class can act as a contextmanager, too.

https://docs.python.org/3/reference/compound_stmts.html#with