Event Loop API

API Reference


The Event Loop API supports the event-driven programming model, which is favoured in Legato (but not forced). Each thread that uses this system has a central event loop which calls event handler functions in response to event reports.

Software components register their event handler functions with the event system (either directly through the Event Loop API or indirectly through other APIs that use the Event Loop API) so the central event loop knows the functions to call in response to defined events.

Every event loop has an event queue, which is a queue of events waiting to be handled by that event loop.

Note
When the process dies, all events, event loops, queues, reports, and handlers will be automatically cleared.

The following different usage patterns are supported by the Event Loop API:

Deferred Function Calls
Dispatching Function Execution to Other Threads
Publish-Subscribe Events
Layered Publish-Subscribe Handlers

Other Legato C Runtime Library APIs using the event loop include:

File Descriptor Monitor API
Timer API
Command Line Arguments API
Signals API
Low-level Messaging API

Deferred Function Calls

A basic Event Queue usage is to queue a function for the Event Loop to call later (when that function gets to the head of the Event Queue) by calling le_event_QueueFunction().

This code sample has a component initialization function queueing another function to be call later, by the process's main thread when the Event Loop is running. Two parameters are needed by the deferred function. The third is just filled with NULL and ignored by the deferred function.

static void MyDeferredFunction
(
void* param1Ptr,
void* param2Ptr
)
{
// Type cast the parameters to what they really are and do whatever it is that
// I need to do with them.
}
 
...
 
COMPONENT_INIT
{
le_event_QueueFunction(MyDeferredFunction, firstParamPtr, secondParamPtr);
}

Deferred function calls are useful when implementing APIs with asynchronous result call-backs. If an error is detected before the API function returns, it can't just call the call-back directly, because it could cause re-entrancy problems in the client code or cause recursive loops. Instead of forcing the API function to return an error code in special cases (which will increase the client's code complexity and may leak API implementation details to the client), the API function can defers executing the call-back until later by queuing an error handling function onto the Event Queue.

Dispatching Function Execution to Other Threads

In multi-threaded programs, sometimes the implementor needs to ask another thread to run a function because:

  • The function to be executed takes a long time, but doesn't have to be done at a high priority.
  • A call needs to be made into a non-thread-safe API function.
  • A blocking function needs to be called, but the current thread can't afford to block.

To assist with this, the Event Loop API provides le_event_QueueFunctionToThread(). It works the same as le_event_QueueFunction(), except that it queues the function onto a specific thread's Event Queue.

If the other thread isn't running the Event Loop, then the queued function will never be executed.

This code sample shows two arguments started by the process's main thread, and executed in the background by a low-priority thread. The result is reported back to the client through a completion callback running in the same thread that requested that the computation be performed.

static le_mem_PoolRef_t ComputeRequestPool;
static le_thread_Ref_t LowPriorityThreadRef;
 
typedef struct
{
size_t arg1; // First argument
size_t arg2; // Second argument
ssize_t result; // The result
void (*completionCallback)(ssize_t result); // The client's completion callback
le_thread_Ref_t requestingThreadRef; // The client's thread.
}
ComputeRequest_t;
 
// Main function of low-priority background thread.
static void* LowPriorityThreadMain
(
void* contextPtr // not used.
)
{
}
 
{
ComputeRequestPool = le_mem_CreatePool("Compute Request", sizeof(ComputeRequest_t));
 
LowPriorityThreadRef = le_thread_Create("Background Computation Thread",
LowPriorityThreadMain,
NULL);
le_thread_SetPriority(LowPriorityThreadRef, LE_THREAD_PRIORITY_IDLE);
le_thread_Start(LowPriorityThreadRef);
}
 
// This function gets run by a low-priority, background thread.
static void ComputeResult
(
void* param1Ptr, // request object pointer
void* param2Ptr // not used
)
{
ComputeRequest_t* requestPtr = param1Ptr;
 
requestPtr->result = DoSomeReallySlowComputation(requestPtr->arg1, requestPtr->arg2);
 
le_event_QueueFunctionToThread(requestPtr->requestingThreadRef,
ProcessResult,
requestPtr,
NULL);
}
 
// This function gets called by a component running in the main thread.
static void ComputeResultInBackground
(
size_t arg1,
size_t arg2,
void (*completionCallback)(ssize_t result)
)
{
ComputeRequest_t* requestPtr = le_mem_ForceAlloc(ComputeRequestPool);
requestPtr->arg1 = arg1;
requestPtr->arg2 = arg2;
requestPtr->requestingThreadRef = le_thread_GetCurrent();
requestPtr->completionCallback = completionCallback;
le_event_QueueFunctionToThread(LowPriorityThreadRef,
ComputeResult,
requestPtr,
NULL);
}
 
// This function gets run by the main thread.
static void ProcessResult
(
void* param1Ptr, // request object pointer
void* param2Ptr // not used
)
{
ComputeRequest_t* requestPtr = param1Ptr;
completionCallback(requestPtr->result);
le_mem_Release(requestPtr);
}

Publish-Subscribe Events

In the publish-subscribe pattern, someone publishes information and if anyone cares about that information, they subscribe to receive it. The publisher doesn't have to know whether anything is listening, or how many subscribers might be listening. Likewise, the subscribers don't have to know whether anything is publishing or how many publishers there might be. This decouples publishers and subscribers.

Subscribers add handlers for events and wait for those handlers to be executed.

Publishers report events.

When an event report reaches the front of an Event Queue, the Event Loop will pop it from the queue and call any handlers that have been registered for that event.

Events are identified using an Event ID created by calling le_event_CreateId() before registering an handler for that event or report. Any thread within the process with an Event ID can register a handler or report events.

Note
These Event IDs are only valid within the process where they were created. The Event Loop API can't be used for inter-process communication (IPC).
le_event_Id_t eventId = le_event_CreateId("MyEvent", sizeof(MyEventReport_t));

Event reports can carry a payload. The size and format of the payload depends on the type of event. For example, reports of temperature changes may need to carry the new temperature. To support this, le_event_CreateId() takes the payload size as a parameter.

To report an event, the publisher builds their report payload in their own buffer and passes a pointer to that buffer (and its size) to le_event_Report():

MyEventReport_t report;
... // Fill in the event report.
le_event_Report(EventId, &report, sizeof(report));

This results in the report getting queued to the Event Queues of all threads with handlers registered for that event ID.

To register a handler, the subscriber calls le_event_AddHandler().

Note
It's okay to have a payload size of zero, in which case NULL can be passed into le_event_Report().
le_event_HandlerRef_t handlerRef = le_event_AddHandler("MyHandler", eventId, MyHandlerFunc);

When an event report reaches the front of a thread's Event Queue, that thread's Event Loop reads the report and then:

  • Calls the handler functions registered by that thread.
  • Points to the report payload passed to the handler as a parameter.
  • Reports the payload was deleted on return, so the handler function must copy any contents to keep.
static void MyHandlerFunc
(
void* reportPayloadPtr
)
{
MyEventReport_t* reportPtr = reportPayloadPtr;
// Process the report.
...
}

Another opaque pointer, called the context pointer can be set for the handler using le_event_SetContextPtr(). When the handler function is called, it can call le_event_GetContextPtr() to fetch the context pointer.

static void MyHandlerFunc
(
void* reportPayloadPtr
)
{
MyEventReport_t* reportPtr = reportPayloadPtr;
MyContext_t* contextPtr = le_event_GetContextPtr();
 
// Process the report.
...
}
 
{
MyEventId = le_event_CreateId("MyEvent", sizeof(MyEventReport_t));
 
MyHandlerRef = le_event_AddHandler("MyHandler", MyEventId, MyHandlerFunc);
le_event_SetContextPtr(MyHandlerRef, sizeof(float));
}

Finally, le_event_RemoveHandler() can be used to remove an event handler registration, if necessary.

le_event_RemoveHandler(MyHandlerRef);

If a handler is removed after the report for that event has been added to the event queue, but before the report reaches the head of the queue, then the handler will not be called.

Note
To prevent race conditions, it's not permitted for one thread to remove another thread's handlers.

Layered Publish-Subscribe Handlers

If you need to implement an API that allows clients to register "handler" functions to be called-back after a specific event occurs, the Event Loop API provides some special help.

You can have the Event Loop call your handler function (the first-layer handler), to unpack specified items from the Event Report and call the client's handler function (the second-layer handler).

For example, you could create a "Temperature Sensor API" that allows its clients to register handler functions to be called to handle changes in the temperature, like this:

// Temperature change handler functions must look like this.
typedef void (*tempSensor_ChangeHandlerFunc_t)(int32_t newTemperature, void* contextPtr);
 
// Opaque type used to refer to a registered temperature change handler.
typedef struct tempSensor_ChangeHandler* tempSensor_ChangeHandlerRef_t;
 
// Register a handler function to be called when the temperature changes.
tempSensor_ChangeHandlerRef_t tempSensor_AddChangeHandler
(
tempSensor_ChangeHandlerFunc_t handlerFunc, // The handler function.
void* contextPtr // Opaque pointer to pass to handler function.
);
 
// De-register a handler function that was previously registered using
// tempSensor_AddChangeHandler().
void tempSensor_RemoveChangeHandler
(
tempSensor_ChangeHandlerRef_t handlerRef
);

The implementation could look like this:

{
TempChangeEventId = le_event_CreateId("TempChange", sizeof(int32_t));
}
 
static void TempChangeHandler
(
void* reportPtr,
void* secondLayerHandlerFunc
)
{
int32_t* temperaturePtr = reportPtr;
tempSensor_ChangeHandlerRef_t clientHandlerFunc = secondLayerHandlerFunc;
 
clientHandlerFunc(*temperaturePtr, le_event_GetContextPtr());
}
 
tempSensor_ChangeHandlerRef_t tempSensor_AddChangeHandler
(
tempSensor_ChangeHandlerFunc_t handlerFunc,
void* contextPtr
)
{
 
handlerRef = le_event_AddLayeredHandler("TempChange",
TempChangeEventId,
TempChangeHandler,
handlerFunc);
le_event_SetContextPtr(handlerRef, contextPtr);
 
return (tempSensor_ChangeHandlerRef_t)handlerRef;
}
 
void tempSensor_RemoveChangeHandler
(
tempSensor_ChangeHandlerRef_t handlerRef
)
{
}

This approach gives strong type checking of both handler references and handler function pointers in code that uses this Temperature Sensor API.

Event Reports Containing Reference-Counted Objects

Sometimes you need to report an event where the report payload is pointing to a reference-counted object allocated from a memory pool (see Dynamic Memory Allocation API). Memory leaks and/or crashes can result if its is sent through the Event Loop API without telling the Event Loop API it's pointing to a reference counted object. If there are no subscribers, the Event Loop API iscards the reference without releasing it, and the object is never be deleted. If multiple handlers are registered, the reference could be released by the handlers too many times. Also, there are other, subtle issues that are nearly impossible to solve if threads terminate while reports containing pointers to reference-counted objects are on their Event Queues.

To help with this, the functions le_event_CreateIdWithRefCounting() and le_event_ReportWithRefCounting() have been provided. These allow a pointer to a reference-counted memory pool object to be sent as the payload of an Event Report.

le_event_ReportWithRefCounting() passes ownership of one reference to the Event Loop API, and when the handler is called, it receives ownership for one reference. It then becomes the handler's responsibility to release its reference (using le_mem_Release()) when it's done.

le_event_CreateIdWithRefCounting() is used the same way as le_event_CreateId(), except that it doesn't require a payload size as the payload is always known from the pointer to a reference-counted memory pool object. Only Event IDs created using le_event_CreateIdWithRefCounting() can be used with le_event_ReportWithRefCounting().

static le_event_Id_t EventId;
le_mem_PoolRef_t MyObjectPoolRef;
 
static void MyHandler
(
void* reportPtr // Pointer to my reference-counted object.
)
{
MyObj_t* objPtr = reportPtr;
 
// Do something with the object.
...
 
// Okay, I'm done with the object now.
le_mem_Release(objPtr);
}
 
{
EventId = le_event_CreateIdWithRefCounting("SomethingHappened");
le_event_AddHandler("SomethingHandler", EventId, MyHandler);
MyObjectPoolRef = le_mem_CreatePool("MyObjects", sizeof(MyObj_t));
}
 
static void ReportSomethingDetected
(
...
)
{
MyObj_t* objPtr = le_mem_ForceAlloc(MyObjectPool);
 
// Fill in the object.
...
 
}

Miscellaneous Multithreading Topics

All functions in this API are thread safe.

Each thread can have only one Event Loop. The main thread in every Legato process will always run an Event Loop after it's run the component initialization functions. As soon as all component initialization functions have returned, the main thread will start processing its event queue.

When a function is called to "Add" an event handler, that handler is associated with the calling thread's Event Loop. If the calling thread doesn't run its Event Loop, the event reports will pile up in the queue, never getting serviced and never releasing their memory. This will appear in the logs as event queue growth warnings.

If a client starts its own thread (e.g., by calling le_thread_Create() ), then that thread will not automatically run an Event Loop. To make it run an Event Loop, it must call le_event_RunLoop() (which will never return).

If a thread running an Event Loop terminates, the Legato framework automatically deregisters any handlers and deletes the thread's Event Loop, its Event Queue, and any event reports still in that Event Queue.

Integrating with Legacy POSIX Code

Many legacy programs written on top of POSIX APIs will have previously built their own event loop using poll(), select(), or some other blocking functions. It may be difficult to refactor this type of event loop to use the Legato event loop instead.

Two functions are provided to assist integrating legacy code with the Legato Event Loop:

  • le_event_GetFd() - Fetches a file descriptor that can be monitored using some variant of poll() or select() (including epoll). It will appear readable when the Event Loop needs servicing.
  • le_event_ServiceLoop() - Services the event loop. This should be called if the file descriptor returned by le_event_GetFd() appears readable to poll() or select().

In an attempt to avoid starving the caller when there are a lot of things that need servicing on the Event Loop, le_event_ServiceLoop() will only perform one servicing step (i.e., call one event handler function) before returning, regardless of how much work there is to do. It's the caller's responsibility to check the return code from le_event_ServiceLoop() and keep calling until it indicates that there is no more work to be done.

Troubleshooting

A logging keyword can be enabled to view a given thread's event handling activity. The keyword name depends on the thread and process name where the thread is located. For example, the keyword "P/T/events" controls logging for a thread named "T" running inside a process named "P".