APIs

Environment

class pesim.sim.Environment(start=False, stop_time=TIME_FOREVER)

The simulation environment:

e = Environment(start=True)
#add some processes
e.run_until(3600, TIME_PASSED)
#add some other process or do sth else
e.join() #or e.run_until(TIME_FOREVER, TIME_PASSED)
e.finish()

Instead of using start() and finish() explicitly, the Environment can also be used with context:

with Environment() as e:
    #add some processes
    e.run_until(3600, TIME_PASSED)
    #add some other processes or do sth else

In this case, the environment will be automatically started (when entering the context), joined and finished (when exiting the context).

Parameters
  • start (bool) – Start the simulation immediately. Optional, default is False. If it is False, the environment needs to be started by start() explicitly.

  • stop_time (double) – Maximum time of simulation run. Used in join(). Default is TIME_FOREVER.

time

Current time in the simulation environment.

Type

float

started

Whether the simulation is started.

Type

bool

stop_time

Maximum time of simulation run.

Type

double

start()

Start the simulation.

join()

Process all the remaining events. In other words, this call runs the simulation until self.stop_time.

finish()

Finish the simulation.

run_until(ex_time, after_reason=TIME_PASSED)

Run the simulation until a specific time.

Parameters
  • ex_time (double) – A time.

  • after_reason (int) – A reason. If a event has a triggering time of ex_time and a reason that is greater than after_reason, it will not be processed and the simulation will stop before it.

Returns

Current time (float) in the simulation after the running.

process(func, *args, loop_forever=False, **kwargs)

Turn a generator function to a Process. The resulting process will be added to the environment automatically.

The following code turns the tick function into a process, which will print the time every 5 seconds:

env = Environment()

def tick(self, interval):
    yield self.time + interval, TIME_PASSED
    print("tick!", self.time)

p0 = env.process(tick, 5, loop_forever=True)
Parameters
  • func – A generator function contains at least one yield statement.

  • *args – Positional arguments to be passed to func.

  • loop_forever – Whether to loop the execution of func.

  • **kwargs – Optional arguments to be passed to func.

Returns

The resulting process (Process).

next_event_time()

Time of the next event in the simulation environment.

Returns

Time as a float number.

Process

class pesim.process.Process(env)

Base class of all process. A process can be implemented by using Environment.process(), or by directly inheriting this class. There are two ways to define a process by inheriting this class:

  1. overriding __call__()

  2. overriding _wait() and _process()

If __call__() is overridden, any implementation of _wait() and _process() will be ignored. The content of __call__() will be executed as the process logic. Note that __call__() will be executed only once. So, if the process need to do something repeatedly, do not forget to wrap the code in a infinite loop.

_wait() amd _process() are for convenience of create repeat tasks. _wait() is supposed to return the process’s next activation time and reason, and _process() will do some actual work. This wait -> process pattern will be repeated infinitely. One can consider the default implementation of __call__() as (not exactly, simplified):

while True:
    yield self._wait()
    p = self._process()
    if isgenerator(p):
        yield from p

_process() can either be a generator or not, depends on whether the process need to wait when processing the task.

Examples

Here is an example process defined in four ways

  1. Passing generator to env.process():

    def tick(self):
        while True:
            yield self.time + 5, TIME_REACHED
            print("tick", self.time)
            yield self.time + 10, TIME_PASSED
            print("tack", self.time)
    
    with Environment() as env:
        p = env.process(tick)
    
  2. Passing generator to env.process() with loop_forever=True:

    def tick(self):
        yield self.time + 5, TIME_REACHED
        print("tick", self.time)
        yield self.time + 10, TIME_PASSED
        print("tack", self.time)
    
    with Environment() as env:
        p = env.process(tick, loop_forever=True)
    
  3. Inheriting Process and overriding __call__():

    class Tick(Process):
        def __call__(self):
            while True:
                yield self.time + 5, TIME_REACHED
                print("tick", self.time)
                yield self.time + 10, TIME_PASSED
                print("tack", self.time)
    
    with Environment() as env:
        p = Tick(env)
    
  4. Inheriting Process and overriding _wait() and _process():

    class Tick(Process):
        def _wait(self):
            return self.time + 5, TIME_REACHED
    
        def _process(self):
            print("tick", self.time)
            yield self.time + 10, TIME_PASSED
            print("tack", self.time)
    
    with Environment() as env:
        p = Tick(env)
    
Parameters

env (Environment) – The environment the process belongs to. After creation, the process will be added to the environment automatically.

time

Current time in the simulation environment. This is just a alias of the corresponding environment’s time attribute. In other words, all the processes in the same environment shares the some time.

Type

float

__call__()

Override this for the main logic of the process.

Yields

time(float) – next activation time reason(int): next activation reason

_wait()

Override this method to return next activation time and reason. Note that this method is not a generator.

Returns

time (float) and reason (int)

_process()

Override this method to perform one task. This method can be either implemented as a generator with yield statements, or a normal method with optional return statement. If there is yield statements, the process will be accordingly paused and reactivated to perform simulation logic.

Yields

time (float) and reason (int)

Returns

returns will be ignored.

activate(double time, int reason)

Activate a process at the time with a reason.

Parameters
  • time (float) – Activation time. The time must be equal or greater than current simulation time. If a time earlier than current simulation is passed, the process will be activated immediately.

  • reason (reason) – The activation reason.

next_activation_time()

next_activation_time(): The process’s following activation time. This method can only be called when the process is paused.

Returns

Time as a float

Constants

pesim.define.TIME_FOREVER

A pre-defined time which can be considered as +inf.

Type

float

pesim.define.TIME_PASSED

A pre-defined reason which can be considered as +inf. When a process is activated at time t with this reason, all the events at time t have been processed.

Type

int

pesim.define.TIME_REACHED

A pre-defined reason which can be considered as -inf. When a process is activated at time t with this reason, all the events at time t have not been processed.

Type

int

pesim.define.LOCK_RELEASED

A pre-defined reason representing a sync primitive is “released”.

Type

int

Synchronisation Primitives

This module defines for synchronisation primitives: Lock, Semaphore, RLock and Condition.

Lock, OrderedLock, Semaphore and RLock all have methods of acquire() and release(). acquire() waits on the primitive object until it becomes unlocked, and then turns its state to lock. Meanwhile release() is used to unlocked a previously locked primitive object. The calling of acquire() can be blocking or non-blocking. If it’s non-blocking and the object is currently locked, the calling will return immediately with a return value of False. If it’s blocking (default), it can be used in the yield statement inside process logics. The process will be reactivated once the object’s state turns to unlocked again:

def consumer(self, task_queue, sem:Semaphore):
    while True:
        yield sem.acquire(self)
        #retrieve a task from task_queue and process the task

def producer(self, task_queue, sem:Semaphore):
    while True:
        yield self.time + 5, TIME_REACHED
        #push a new task into task_queue
        sem.release()

Condition does not have methods acquire() and release(). Instead, it has two method set() and wait(), where set() sets the condition and wait() waits until the condition is set.

OrderedLock has an additional method declare_acquiring(). A object need to declare with a key before acquiring a OrderedLock. The objects will get the lock in the order of the key.

class pesim.lock.Lock

The class implementing primitive lock objects. Once a process has acquired a lock, subsequent attempts to acquire it will block, until it is released.

acquire(proc, sync=True)

Acquire the lock

Parameters
  • proc (Process) – the process to acquire the lock. So that the lock can know, when it is released, which process shall be waked. In most of the cases, this argument is self.

  • sync (bool) – True for a blocking call, and False for a non-blocking call.

Returns

If sync=False, returns True or False to indicate if the acquiring is succeed. If sync=True, returns a 2-tuple (time, reason) which can be used in a yield statement.

release(reason=LOCK_RELEASED)

Release the lock.

Parameters

reason (int) – If there is a process currently waiting for acquiring the object, the process will be activated with the given reason. If there are more than one processes waiting, only one of them will be activated.

class pesim.lock.Semaphore(value)

A Semaphore object has a internal counter. The value of the counter can never go below zero. If the counter value is zero, the acquire() will block until the value goes greater than 0. A successful acquire() decreases internal counter by 1, and release() increases the counter by 1.

Parameters

value (int) – The initial value of the internal counter.

value

Current value of the internal counter.

Type

int

acquire(self, proc, sync=True)
Parameters
  • proc (Process) – The process to acquire the semaphore.

  • sync (bool) – Blocking or non-blocking.

Returns

True or False when sync=False, otherwise a 2-tuple (time, reason) to be used with yield.

See Lock.acquire() for more information.

release(reason=LOCK_RELEASED)

Release the semaphore and increase the internal counter by 1.

Parameters

reason (int) – If there is a process currently waiting for acquiring the object, the process will be activated with the given reason. If there are more than one processes waiting, only one of them will be activated.

class pesim.lock.RLock

A reentrant lock is a synchronization primitive that may be acquired multiple times, if these acquiring are all associated to a same object. This is useful when several processes want to cooperate on some task using some shared resources, while still want to blocking some other process to access the resources.

One example use case: two equipment needs to go to a same area to cooperatively finish one task. When the task is conducting, no other equipment can enter this area. We can create a RLock() to represent the area, and let the equipment acquire the lock with the task. In this way, once one equipment has successfully acquired the lock, the equipment working on the same task (so it will acquire with the same task) can acquire this lock again. But other equipment’s acquiring (using other tasks) will fail.

There is an additional argument obj in RLock.acquire(), which indicates the associated resources.

acquire(self, proc, sync=True, obj=None)

If the self is unlocked, the acquiring will associate the obj to the self object and lock it. The subsequent attempts will be successful only if they are acquiring with the same obj.

Parameters
  • proc (Process) – The process to acquire the lock.

  • sync (bool) – Blocking or non-blocking.

  • obj (object) – A object associated to this acquiring.

Returns

True or False when sync=False, otherwise a 2-tuple (time, reason) to be used with yield.

release(reason=LOCK_RELEASED)

Release the lock and unlink the object associated.

Parameters

reason (int) – If there is a process currently waiting for acquiring the lock, the process will be activated with the given reason and the new obj will be associated. If there are more than one processes are waiting with different objects, only one objects will be chosen, and all the processes acquiring with this object will be activated.

class pesim.lock.Condition

This class implements condition objects. A condition variable allows one or more process to wait until they are notified by another process.

Examples

In the following example, a producer generates a new task only after the previously has been processed:

def consumer(self, task_queue, task_sem, complete_cond):
    while True:
        yield task_sem.acquire(self)
        #get a task from task_queue and process
        complete_cond.set()

def producer(self, task_queue, task_sem, complete_cond):
    while True:
        #add a task to task_queue
        task_sem.release()
        yield complete_cond.wait(self)
value

0 if the condition is not set, 1 if the condition is set.

Type

int

set(reason=LOCK_RELEASED)

Set the condition.

Parameters

reason – Activation reason. When the condition is set, all the processes waiting on the condition will be activated with the given reason.

wait(proc, sync=True)

Wait on a condition.

Parameters
  • proc (Process) – The waiting process.

  • sync (bool) – Blocking or non-blocking.

Returns

True or False when sync=False, otherwise a 2-tuple (time, reason) to be used with yield.

clear()

Unset a condition variable.

class pesim.lock.OrderedLock

This class implements ordered lock objects. The OrderedLock enforces the order of objects get the lock.

Consider the following scenario. A task need to be operated by two equipments A and B in order, and both equipment need to access an unsharable resources r. After completed its part in a task, the equipment will start to operate a new task immediately. If we lock the resource with a normal Lock and let A acquire the lock again immediately for the new task after releasing it for previous task, its possible that B will never get the lock. In this case, forcing the order of the objects getting the lock can be a solution.

There is an additional method declare_acquiring() that needs to be invoked before actually acquiring the lock. A key need to be provided to call the method. After that, it will be assured that the objects get the lock in the order of the key. In previous case, if we assign each task an increasing key and declare the acquiring for A and B once the task has been generated, B will always get the lock prior to A get the lock for the next task, even if A acquire again earlier than B.

Examples:

lock = OrderedLock()
a.declare_acquiring(1)
b.declare_acquiring(2)

In this case, if b acquires first, it will always be blocked until a acquires and then releases the lock, even if at b’s acquiring time, lock is unlocked.

acquire(self, proc, sync=True, object key=None)

Acquire the lock. The acquiring will be successful only if self is unlocked and the ket equals to current smallest key that has been declared and not successfully acquired.

Parameters
  • proc (Process) – The process to acquire the lock.

  • sync (bool) – Blocking or non-blocking.

  • key (object) – A key. The key should have been declared.

Returns

True or False when sync=False, otherwise a 2-tuple (time, reason) to be used with yield.

declare_acquiring(self, key)

Declare a future acquiring.

Parameters

key (object) – A key that will be associated with the future acquiring. key must be comparable using <. Future objects will get the lock in the order of key, from smallest to largest.

release(reason=LOCK_RELEASED)

Release the lock.

Parameters

reason – reason to activate next process that gets the lock.

test(self, proc, key)

Test if proc can successfully acquire the object key:

Parameters
  • proc (Process) – A process

  • key (object) – A key

Returns

True or False