Asynchronous Structure Tutorial
In this chapter we introduce a new kind of structure: asynchronous structure. Asynchronous structure can have all the input, output, store and static properties just like the synchronous structure. The major difference is the semantics of the set and get of input and output properties. For an asynchronous structure, the semantic of input and output is more loose. What is defined is that data flows into input and flows out of output, and there is no consistency pairing between input and output. And the data flows in and out a asynchronous structure will be queued, unlike the synchronous structure, the new property value will overwrite the previous one. The store and static property value of asynchronous structure will not be queued, queuing is only for input and output. Asynchronous structure behaves like pipe, while the synchronous structure is more functional.
The purpose of asynchronous structure is to model asynchronous components in system. The examples of asynchronous components include network messaging and interactive user interface. And usually large system more likely behaves in an asynchronous way.
In actual implementation of Luban, the asynchronous structure is implemented as an object with a live thread inside. The inside thread takes data from input queue and put the output to the output queue, while outside user can put new data onto input queue and take output from output queue. So asynchronous structure does multi-threading, only implicitly.
namespace demo;
asynch struct multiplexer
(
input flow1, flow2;
output merged;
)
as process
{
output.merged
= input.flow1 &
output.merged
= input.flow2 &
}
The above example defines a asynchronous structure named “multiplexer”. The structure merges data input from “flow1” and “flow2” into the output “merged” just like a telecom multiplexer. Inside the “multiplexer”, there are two user coded threads, one to read “flow1”, another to read “flow2”, both put the reading into “merged”. There are several details of asynchronous structure that make this structure work.
First, every read of input property removes one value from the input queue. When the queue is empty, the read operations will block and wait until new value comes in. Input property can not be read from outside. Remember in the case of synchronous structure, repeated reading of the same input property read back the same value.
Second, every assignment to the output property adds one
value to the output queue. For the same operation, synchronous structure will
only take the last assignment to the output property.
Understanding the above details, we can code a simple case of producer and consumer in which producer and consumer share the same queue for communication.
namespace demo;
struct simplequeue
(
input inq;
output outq;
)
as process
{
outq
= inq;
}
The above component behaves like a simple multi-thread queue with one end to put in and another end to read out data, plus it will make reader wait when the queue is empty. Then we can use the above component in a sample script like below:
// ……
q = demo::simplequeue();
{ obj= myns::producer().produce(); q.inq=obj; } & // producer thread
{
myns::consume( obj =
q.outq ); } &
// consumer thread
// ……
Above script starts two threads, one to produce object and put it into the “inq” of the queue named “q”, while another to read object out of “outq” of “q” and feed to “myns::consume” structure. When the “outq” is empty, the consumer thread will sleep.
Above examples show how to code asynchronous structure and use it in Luban script. The next section will show how to code and use asynchronous in composition.
In chapter one, when we introduce thread dispatching, we give out an example that shows some nasty surprise when threads are dispatched uncontrolled in a loop. Actually that kind of loop is commonly used in certain situation. For example you could write a server that watch a network port and dispatch thread to handle received message. And this server is simple a forever loop with thread dispatching inside, like below code shows.
while ( true )
{
x = port.getmsg();
::msghandler(msg=x) &
}
Above code is problematic, because the execution order of the main thread and dispatched thread is not deterministic. It could happen that the main loop goes faster that the message “x” is changed before the “::msghandler” thread read it. The way to fix this is to use Asynchronous Structure, as below code.
handler = ::asynchmsghandler();
while ( true )
{
x = port.getmsg();
handler.msg=x;
}
And you define your “asynchmsghandler” like below:
asynch struct asynchmsghandler(input msg;)
as process
{
// handle msg;
}
Since the object “handler” contains an Asynchronous Structure that has a thread inside. And all inputs set to “handler” will be correctly queued to be processed in order.
In chapter one, we described that at the end of script execution, the main thread will wait for all explicitly dispatched threads to finish. Luban engine does not give any special treatment to the live thread inside Asynchronous Structure, meaning it just destroy it like anything else. And the destruction of Asynchronous Structure will stop the internal thread even if it is in the middle of running. The below scripts show the semantics.
asynchprinter
= demo::AsynchPrinter();
for( i=1; i<11; ++i)
asynchprinter.obj = i;
And demo::AsynchPrinter is defined as below.
namespace demo;
asynch
struct AsynchPrinter(input obj;)
as process
{
std::println(obj = input.obj);
}
It may surprise some user that the above script may print nothing, instead of a complete 1 to 10 integer list. The reason is that the main thread destroys the “asynchprinter” objet before it can finish its internal thread can finish its job. Modify the script to use the “waitfinish” member function of Asynchronous Structure will fix the problem.
asynchprinter
= demo::AsynchPrinter();
for( i=0; i<10; ++i)
asynchprinter.obj = i;
asynchprinter.waitfinish();
The waitfinish
member function will only return when the input queue of the structure is empty
and there is at lease one thread inside sleeping and waiting for new input. For
our example, it will guarantee that all objects from 1 to 10 are printed before
the script quits.
We already know that you can trigger the evaluation of synchronous structure by setting its input property and/or by explicitly making structure call. And we expect the evaluation to finish before the execution flow goes further down.
Asynchronous Structure is different by nature since it has a live thread inside. The following describes the basic behavior of Asynchronous Structure.
x = demo::asynchinput();
// no thread started here
y = x.newinput; // this will trigger the internal thread
running in x
x.start(); // this will start the internal thread
of x too
x.waitfinish(); // this will block until internal thread
of x becomes inactive
y = x; // y will
also be an asynch struct
like, but no i/o queue and inactive
y = x( input1 =
0.0); // y will be of error value because x is aynch struct
Asynchronous structure and composition naturally work together. While in process script, user need to pay attention to the semantic difference of an asynchronous structure, in composition, user wire the asynchronous the same way as synchronous structure. Luban take care of the independent thread inside the asynchronous structure automatically.
In below example, we code one asynchronous structure to listen to network message. Then we use the structure in a composition.
namespace demo;
asynch struct messenger
(
output msg;
)
as process
{
socket = net::socket(“msgserver”, 6500);
while ( true )
{
obj = socket.readobj();
if ( obj == “good
bye” ) break;
output.msg = obj;
}
}
struct listenNprint
()
as composition
{
MSG: ::messenger();
PRINTER: std::println(obj = MSG.msg );
}
Above code creates an asynchronous structure to listen to the port 6500 on server “msgserver”, and set the output “msg” whenever object arrives from the server. The “messenger” structure will only stops when it hears “good bye” from the server. The next composition uses this structure as its component and wire it together with a structure that prints message on console. We can run the composition structure using the below script:
demo::listenNprint(=);
When the composition “listenNprint” runs, it will listen to the message on network print what it hear on the console until the server says “good bye” to it.
You can put ad hoc asynchronous cell into a composition by simple declare the ad hoc process cell as “asynch”. The code inside the cell will follow the asynchronous structure semantics automatically. We can code the “listenNprint” structure using ad hoc asynchronous cell as below:
namesapce
demo;
struct listenNprint
()
as composition
{
asynch
MSG: { socket = net::socket(“msgserver”, 6500);
while
( true )
{
obj = socket.readobj();
if ( obj == “good
bye” ) break;
MSG = obj;
}
}
PRINTER: std::println(obj = MSG );
}
You can notice that in the ad hoc cell, there is no output property to set except itself. And its fellow component cells can only refer the value of itself instead of the output property.
As mentioned in previous chapter, cycle consists of only synchronous cells in a composition is not allowed. In another word, if the cycle has one or more asynchronous component inside, it is OK for Luban. Let’s look at the below example:
namesapce demo;
asynch struct player
(
input receive;
output send;
)
{
output.send = 1; // send a ball
while ( true )
{
ball = input.receive;
std::println(obj=ball);
if ( ball ==
1000 ) break;
ball++;
output.send = ball;
}
}
struct pingpong
()
as composition
{
P1: ::player(receive = P2.send );
P2: ::player(receive =
P1.send );
}
It is obvious that the composition structure “pingpong” has a cycle involving cell P1 and P2. Yet Luban is still going to happily take it and run through because P1 and P2 are asynchronous component, so it does not violate Luban’s no-cycle rule.
If you run “pingping” structure, it will print out number 1 to 1000, and print each number twice in a row.