Project

General

Profile

Bug #1346 ยป bt_plugin_bar.py

Francis Deslauriers, 02/15/2022 11:15 AM

 
import bt2

bt2.register_plugin(__name__, "reprod")

class MyFirstSourceIter(bt2._UserMessageIterator):
def __init__(self, config, output_port):
ec = output_port.user_data
sc = ec.stream_class
tc = sc.trace_class

trace = tc()
stream = trace.create_stream(sc)

event_msg = self._create_event_message(ec, stream);

event_msg.event.payload_field['the_static_array'][0]['the_length'] = 2
event_msg.event.payload_field['the_static_array'][0]['the_dynamic_array'] = [23,6]
event_msg.event.payload_field['the_static_array'][1]['the_length'] = 1
event_msg.event.payload_field['the_static_array'][1]['the_dynamic_array'] = [7]

self._msgs = [
self._create_stream_beginning_message(stream),
event_msg,
self._create_stream_end_message(stream),
]

def __next__(self):
if len(self._msgs) > 0:
return self._msgs.pop(0)
else:
raise StopIteration


@bt2.plugin_component_class
class MyFirstSource(bt2._UserSourceComponent, message_iterator_class=MyFirstSourceIter):
def _define_event_payload_fc(self, tc):

the_inner_struct = tc.create_structure_field_class()
the_length = tc.create_unsigned_integer_field_class();
the_dynamic_array = tc.create_dynamic_array_field_class(tc.create_unsigned_integer_field_class(), the_length)

the_inner_struct += [('the_length', the_length)]
the_inner_struct += [('the_dynamic_array', the_dynamic_array)]

the_static_array = tc.create_static_array_field_class(the_inner_struct, 2)

the_payload_fc = tc.create_structure_field_class()
the_payload_fc += [('the_static_array' , the_static_array)]

return the_payload_fc;

def __init__(self, config, params, obj):
tc = self._create_trace_class()
sc = tc.create_stream_class()
ec = sc.create_event_class(name="my-event", payload_field_class = self._define_event_payload_fc(tc))

self._add_output_port("some-name", ec)


@bt2.plugin_component_class
class MyFirstSink(bt2._UserSinkComponent):
def __init__(self, config, params, obj):
self._port = self._add_input_port("some-name")

def _user_graph_is_configured(self):
self._it = self._create_message_iterator(self._port)

def _user_consume(self):
# Consume one message and print it.
msg = next(self._it)

if type(msg) is bt2._StreamBeginningMessageConst:
print("Stream beginning")
elif type(msg) is bt2._EventMessageConst:
ts = msg.default_clock_snapshot.value
name = msg.event.name
print("event {}, timestamp {}".format(name, ts))
elif type(msg) is bt2._StreamEndMessageConst:
print("Stream end")
else:
raise RuntimeError("Unhandled message type", type(msg))
    (1-1/1)