root/ossiedev/branches/jgaeddert/0.8.0/components/PacketSink/PacketSink.cpp @ 9867

Revision 9867, 5.1 KB (checked in by jgaeddert, 3 years ago)

cleaning up packet source/sink

Line 
1/****************************************************************************
2
3Copyright 2010 by your_name_or_organization, all rights reserved.
4
5****************************************************************************/
6
7
8#include <string>
9#include <iostream>
10#include "PacketSink.h"
11
12PacketSink_i::PacketSink_i(const char *uuid, omni_condition *condition) :
13    Resource_impl(uuid), component_running(condition)
14{
15    headerDataPortIn = new standardInterfaces_i::realChar_p("HeaderDataIn");
16    payloadDataPortIn = new standardInterfaces_i::realChar_p("PayloadDataIn");
17    verbose = true;
18    start();
19}
20
21PacketSink_i::~PacketSink_i(void)
22{
23    delete headerDataPortIn;
24    delete payloadDataPortIn;
25}
26
27// Static function for omni thread
28void PacketSink_i::Run( void * data )
29{
30    ((PacketSink_i*)data)->ProcessData();
31}
32
33CORBA::Object_ptr PacketSink_i::getPort( const char* portName ) throw (
34    CORBA::SystemException, CF::PortSupplier::UnknownPort)
35{
36    DEBUG(3, PacketSink, "getPort() invoked with " << portName)
37
38    CORBA::Object_var p;
39
40    p = headerDataPortIn->getPort(portName);
41
42    if (!CORBA::is_nil(p))
43        return p._retn();
44
45    p = payloadDataPortIn->getPort(portName);
46
47    if (!CORBA::is_nil(p))
48        return p._retn();
49
50    /*exception*/
51    throw CF::PortSupplier::UnknownPort();
52}
53
54void PacketSink_i::start() throw (CORBA::SystemException,
55    CF::Resource::StartError)
56{
57    DEBUG(3, PacketSink, "start() invoked")
58    omni_mutex_lock  l(processing_mutex);
59    if( false == thread_started )
60    {
61        thread_started = true;
62        // Create the thread for the writer's processing function
63        processing_thread = new omni_thread(Run, (void *) this);
64
65        // Start the thread containing the writer's processing function
66        processing_thread->start();
67    }
68}
69
70void PacketSink_i::stop() throw (CORBA::SystemException, CF::Resource::StopError)
71{
72    DEBUG(3, PacketSink, "stop() invoked")
73    omni_mutex_lock l(processing_mutex);
74    thread_started = false;
75}
76
77void PacketSink_i::releaseObject() throw (CORBA::SystemException,
78    CF::LifeCycle::ReleaseError)
79{
80    DEBUG(3, PacketSink, "releaseObject() invoked")
81
82    component_running->signal();
83}
84
85void PacketSink_i::initialize() throw (CF::LifeCycle::InitializeError,
86    CORBA::SystemException)
87{
88    DEBUG(3, PacketSink, "initialize() invoked")
89}
90
91void PacketSink_i::query( CF::Properties & configProperties ) throw (CORBA::SystemException, CF::UnknownProperties)
92{
93    if( configProperties.length() == 0 )
94    {
95        configProperties.length( propertySet.length() );
96        for( unsigned int i = 0; i < propertySet.length(); i++ )
97        {
98            configProperties[i].id = CORBA::string_dup( propertySet[i].id );
99            configProperties[i].value = propertySet[i].value;
100        }
101        return;
102    } else {
103        for( unsigned int i = 0; i < configProperties.length(); i++ ) {
104            for( unsigned int j = 0; j < propertySet.length(); j++ ) {
105                if( strcmp(configProperties[i].id, propertySet[i].id) == 0 ) {
106                    configProperties[i].value = propertySet[i].value;
107                }
108            }
109        }
110    } // end if-else
111}
112
113void PacketSink_i::configure(const CF::Properties& props)
114throw (CORBA::SystemException,
115    CF::PropertySet::InvalidConfiguration,
116    CF::PropertySet::PartialConfiguration)
117{
118    DEBUG(3, PacketSink, "configure() invoked")
119
120    static int init = 0;
121    if( init == 0 ) {
122        if( props.length() == 0 ) {
123            std::cout << "configure called with invalid props.length - " << props.length() << std::endl;
124            return;
125        }
126        propertySet.length(props.length());
127        for( unsigned int j=0; j < props.length(); j++ ) {
128            propertySet[j].id = CORBA::string_dup(props[j].id);
129            propertySet[j].value = props[j].value;
130        }
131        init = 1;
132    }
133
134    std::cout << "props length : " << props.length() << std::endl;
135
136    for ( unsigned int i = 0; i <props.length(); i++)
137    {
138        std::cout << "Property id : " << props[i].id << std::endl;
139
140        if (strcmp(props[i].id, "DCE:5c849ed4-0f34-11df-baab-001aa089d644") == 0)
141        {
142        }
143    }
144}
145
146void PacketSink_i::ProcessData()
147{
148    DEBUG(3, PacketSink, "ProcessData() invoked")
149
150
151
152    PortTypes::CharSequence *headerData(NULL);
153    CORBA::UShort headerDataLength;
154
155    PortTypes::CharSequence *payloadData(NULL);
156    CORBA::UShort payloadDataLength;
157
158    while(continue_processing())
159    {
160        // read header data
161        headerDataPortIn->getData(headerData);
162        headerDataLength = headerData->length();
163
164        // read payload data
165        payloadDataPortIn->getData(payloadData);
166        payloadDataLength = payloadData->length();
167
168        // strip packet id
169        packet_id = (*headerData)[0];
170
171        if (verbose) {
172            printf("PacketSink received packet [%4u] %4u bytes\n",
173                    packet_id,
174                    payloadDataLength);
175        }
176
177        headerDataPortIn->bufferEmptied();
178        payloadDataPortIn->bufferEmptied();
179    }
180}
181
182bool PacketSink_i::continue_processing()
183{
184    omni_mutex_lock l(processing_mutex);
185    return thread_started;
186}
187
188
Note: See TracBrowser for help on using the browser.