osdir.com


[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

How to convert a train running program from synchronous to asynchronous?


I have modelled which starts running once drivers and stations are assigned to it. Otherwise, it doesn?t run, really don?t care if passengers are boarded or not at this moment. :) I think this program can help me to introduce to the Python async programming domain.

Here is my program:

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
# train.py
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 

import time
import random

from user import User

class NotReadyToDeparture(Exception):
    pass

class Train:
    def __init__(self, name, category):
        self.name = name
        self.category = category
        self.__drivers = []
        self.__stations = []
    
    @property
    def drivers(self):
        return self.__drivers
    
    @drivers.setter
    def drivers(self, coachmen):
        self.__drivers = coachmen
    
    @property
    def stations(self):
        return self.__stations
    
    @stations.setter
    def stations(self, places):
        self.__stations = places
    
    def next_stoppage(self):
        return self.stations[0]
    
    def run(self):
        total_run_time = 0
        if len(self.drivers) == 0:
            raise NotReadyToDeparture('Drivers are not yet boarded')
        
        if len(self.stations) == 0:
            raise NotReadyToDeparture('Stoppage stations are not yet scheduled')
        
        for station in range(len(self.stations[:])):
            run_time_to_next_stoppage = random.randint(2, 6)
            time.sleep(run_time_to_next_stoppage)
            total_run_time += run_time_to_next_stoppage
            print("Train Reached at {0} in {1} seconds".format(self.stations.pop(0), run_time_to_next_stoppage))
            
        print(f"Train {self.name} is reached the destination in {total_run_time} seconds.")

if __name__ == "__main__":
    train = Train("Digha Express", "Express")
    train.drivers = [ 
        User(f_name="Manish", l_name="Jha"),
        User(f_name="Deepak", l_name="Das")
    ]
    train.stations = [f'Station {letter}' for letter in ['A', 'B', 'C', 'D']]
    train.run()

# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 
# user.py
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - 

class User:
    def __init__(self, f_name, l_name):
        self.f_name = f_name
        self.l_name = l_name
    
    def full_name(self):
        return f"{self.f_name} {self.l_name}?

Drivers are modelled as users. When I do call the `run` method it behaves as:

Train Reached at Station A in 4 seconds
Train Reached at Station B in 4 seconds
Train Reached at Station C in 3 seconds
Train Reached at Station D in 4 seconds
Train Digha Express is reached the destination in 15 seconds.


What I am looking for is that once I do call train.run() I get the control back, and keep querying what the next stoppage is. something like this:

train.run()
while len(train.stations) is not 0:
    print(f"Next stoppage is {train.stations[0]}")

How should I modify this current program to meet my goal?

Thanks,

Arup Rakshit
ar at zeit.io