/* (c) Copyright, Real-Time Innovations, 2012.  All rights reserved. 
 * Author:  Gerardo Pardo-Castellote 
 */

package com.rti.dds.snippets;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ConcurrentSkipListMap;

import com.rti.dds.cdr.CdrBuffer;
import com.rti.dds.cdr.CdrInputStream;
import com.rti.dds.cdr.CdrOutputStream;
import com.rti.dds.domain.*;
import com.rti.dds.infrastructure.*;
import com.rti.dds.publication.builtin.*;
import com.rti.dds.subscription.*;
import com.rti.dds.subscription.builtin.*;
import com.rti.dds.typecode.TypeCode;
import com.rti.dds.typecode.TypeCodeFactory;
import com.rti.dds.util.NativeInterface;
import com.rti.ndds.config.Version;

public class MonitorDicoveredTypes {
    private static int verbosity  = 1;
    
    private int domainId;
    private DomainParticipant participant;
    private PublicationBuiltinTopicDataDataReader   publicationsDR;
    private SubscriptionBuiltinTopicDataDataReader  subscriptionsDR;
      
    private final static int MAX_ACTIVE_CONDITIONS = 3; // We will only install e conditions on the
    private ConditionSeq activeConditionSeq;
    private WaitSet discoveryWaitSet;
    
    // This will hold the discovered types
    private ConcurrentSkipListMap<String, TypeCode> discoveredTypes;
    
  
    public boolean start(int theDomainId) {        
        Version version = Version.get_instance ();
        System.out.println("Running RTI Connext version: " + version);
        
        discoveredTypes = new ConcurrentSkipListMap<String, TypeCode>();
               
        domainId = theDomainId;
        DomainParticipantFactory factory = DomainParticipantFactory.get_instance();
        DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
        
        // This instructs the DomainParticipantFactory to not enable the DomainParticipant
        // entities it creates automatically. This is needed so that we have a chance to
        // retrieve the builtin data-readers before the participant starts receiving
        // discovery data. Later it is explained why this is needed
        factoryQos.entity_factory.autoenable_created_entities = false;
        
        DomainParticipantQos pQos = new DomainParticipantQos();
        factory.get_default_participant_qos(pQos);
        pQos.participant_name.name = "RTI Connext Monitor Types Snippet";
        try {
            participant = factory.create_participant(
                    domainId, 
                    pQos, //DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT, 
                    null, // listener
                    StatusKind.STATUS_MASK_NONE);
        } catch ( Exception e) {
            String lastStartError = "Error creating the DDS domain. Common causes are:"
                + "\n   - Lack of a network. E.g disconected wireless."
                + "\n   - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
                + "\n      for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";
            
            System.out.println(lastStartError);
            return false;
        }
   
        // We count ourselves as a participant that is present

        // The "lookup_xxx" operations not only retrieve the builtin data-readers but also
        // activate the cacheing of discovered types. To save resources discovered types
        // are only saved in the builtin reader cache which is only active after calling
        // the corresponding "lookup_xxx" operation. 
        // It is for this reason that we instructed the DomainParticipantFactory to not
        // automatically enabled the DomainParticipant. This gives us the opportuity to
        // retrieve the builtin entities and activate the cacheing of discovered types
        // beforfe we receive any discovery information. If we did not do this we may
        // miss the data-types of the first few entities discovered        
        publicationsDR = (PublicationBuiltinTopicDataDataReader) 
            participant.get_builtin_subscriber().lookup_datareader("DCPSPublication");
        
        subscriptionsDR = (SubscriptionBuiltinTopicDataDataReader) 
            participant.get_builtin_subscriber().lookup_datareader("DCPSSubscription");
  
        
        // Enable the participant. This causes it to start receiving discovery traffic.  
        // Note: Enable fails if there is no network of if any interfaces are not multicast enabled
        // In some platforms (e.g. MacOSX) having the VPN running causes enable() to fail
        try {
            participant.enable(); 
        } catch ( Exception e) {
            String lastStartError = "Error enabling the DDS domain. Common causes are:"
                + "\n   - Lack of a network. E.g disconected wireless."
                + "\n   - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
                + "\n      for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";
            
            System.out.println(lastStartError);
            return false;
        }
        
        // Create a WairSet object that can be used to block the calling thread until there is
        // discovery data to read. This avoids having to poll and this use CPU continuously. 
        discoveryWaitSet = new WaitSet();
        
        // Attach the conditions that would wakeup the waitset. In this case the arrival of data on
        // any of the builting datareaders      
        discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition());
        publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);
        
        discoveryWaitSet.attach_condition(subscriptionsDR.get_statuscondition());
        subscriptionsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);
        
        activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS);

        return true;
    }

    public void waitForDiscoveryData() {
        Duration_t waitDuration = 
            new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC);
        
        System.out.println("waitForDiscoveryData");
        discoveryWaitSet.wait(activeConditionSeq, waitDuration);
    }
  
    /**
     * Processes all the new TypeCodes discovered entering the new ones
     * into an internal map organized by their type_name and reporting situations
     * where the same type_name is used with different definitions of the TypeCode
     * 
     * @return A list with the TypeCodes that were entered into the internal map
     *  that is the ones that correspond to a type_name that was not seen before
     */
    public ArrayList<TypeCode> processDiscoveredTypes() {
        ArrayList<TypeCode> newTypeCodes = new ArrayList<TypeCode>();
        
        System.out.println("processDiscoveredTypes");
        if ( publicationsDR.get_statuscondition().get_trigger_value() ) {
            processTypesInDiscoveredDataWriters(newTypeCodes);
        }
        if ( subscriptionsDR.get_statuscondition().get_trigger_value() ) {
            processTypesInDiscoveredDataReaders(newTypeCodes);
        }       
        
        return newTypeCodes;
    }
    
    private void processTypesInDiscoveredDataReaders(ArrayList<TypeCode> newTypeCodes) {
        SubscriptionBuiltinTopicData subscriptionData = new SubscriptionBuiltinTopicData();;
        SampleInfo info = new SampleInfo();;
        
        try {
            while ( true ) {
                subscriptionsDR.take_next_sample(subscriptionData, info);
                
                // We are only interested in new DataReaders which we have not
                // seen before. This is because the data-type will never change for any
                // one DataReader to other updates indicating change of QoS or deletion of the
                // DataReader can be safely ignored
                if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) {
                   System.out.println("DataReader (New)" +
                            " name: \""  + subscriptionData.subscription_name.name + "\"" +
                            " topic: \"" + subscriptionData.topic_name + "\"" +
                            " type: \""  + subscriptionData.type_name + "\"" );

                   if (processType(subscriptionData.type_name, subscriptionData.type_code)) {
                       newTypeCodes.add(subscriptionData.type_code);
                   }
                }
             }
        }
        catch (RETCODE_NO_DATA noData) {    } 
        // catch (RETCODE_BAD_PARAMETER badParam) {   }
        finally {  }                
    }

    private void processTypesInDiscoveredDataWriters(ArrayList<TypeCode> newTypeCodes) {
        PublicationBuiltinTopicData publicationData = new PublicationBuiltinTopicData();
        SampleInfo info = new SampleInfo();
       
        try {
            while ( true ) {
                publicationsDR.take_next_sample(publicationData, info);
                
                // We are only interested in new DataWriters which we have not
                // seen before. This is because the data-type will never change for any
                // one DataWriter to other updates indicating change of QoS or deletion of the
                // DataWriter can be safely ignored
                if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) {
                   System.out.println("DataWriter (New)" +
                            " name: \""  + publicationData.publication_name.name + "\"" +
                            " topic: \"" + publicationData.topic_name + "\"" +
                            " type: \""  + publicationData.type_name + "\"" );

                   if ( processType(publicationData.type_name, publicationData.type_code) ) {
                       newTypeCodes.add(publicationData.type_code);
                   }
                }
             }
        }
        catch (RETCODE_NO_DATA noData) {    } 
        // catch (RETCODE_BAD_PARAMETER badParam) {   }
        finally {  }        
    }

    /** This function illustrates how to serialize a TypeCode in Java such that it can be saved in 
     * a file or sent out-of band.
     *
     * @param typeCode a TypeCode such as the one received via discovery in 
     * the BuitinPublicationTopicData or BuitinSubscriptionTopicData
     * 
     * @return A byte array containing a serialized raw-byte representation of the TypeCode
     */
    public byte[] serializeTypeCode(TypeCode typeCode) {
        int bufferSize = typeCode.get_serialized_size(0);
        CdrOutputStream stream = new CdrOutputStream(bufferSize);
        typeCode.serialize(stream);
        return stream.getBuffer().getBuffer();
    }
    
    /**
     *  This function illustrates how to create a TypeCode in Java from the serialized representation
     *  obtained by the call to serializeTypeCode(). 
     *  
     * @param serializedTypeCode an array of bytes containing the serialized representation of
     *        a TypeCode such as the one returned by serializeTypeCode()
     *        
     * @return A TypeCode that corresponds to that serialized representation
     */
    // a file or sent out-of band.
    public TypeCode deserializeTypeCode(byte[] serializedTypeCode) {
        boolean needByteSwap =
                NativeInterface.getInstance().isNativeByteOrderLittleEndian();

        CdrInputStream stream = new CdrInputStream(serializedTypeCode, needByteSwap);           
        TypeCode typeCode = 
                TypeCodeFactory.get_instance().create_tc_from_stream(stream);
        
        return typeCode;
    }
   
    /**
     * Adds the type_code into the discoveredTypes map when it is the first time that it
     * sees a type_code associated with the name type_name. It also reports errors if 
     * if a type_code is found which has the same name as another but an different 
     * definition.
     * 
     * Looks up the type_code in the discoveredTypes Map to determine if there 
     * is already one with that same type_name.
     * 
     * If no TypeCode with name type_name is found, then it is entered into discoveredTypes
     * If a TypeCode if found with type_name then the TypeCodes are compared and if they are
     * not the same it prints an error and the two types. 
     * 
     * The function returns true if the type_code was entered into the discoveredTypes map,
     * that is, if it is teh first time that the type_name was seen.
     * 
     * @param type_name  the type name associated with the type_code
     * @param type_code  the TypeCode that describes the structure of the type.
     * 
     * @return true if this is a type that was not seen before. Otherwise returns false
     */
    private boolean processType(String type_name, TypeCode type_code) {
        TypeCode existingType = null;
        
        System.out.println("Discovered type: " + type_name);
        if ( type_code == null ) {
            System.out.println("No type information was supplied for type: \"" + type_name + "\"");    
            return false;
        }
        
        // See if we already had a type with the same name:
        existingType = discoveredTypes.get(type_name);
        
        if ( existingType == null ) {
            System.out.println("This type had not seen before. Its structure is:");
            type_code.print_IDL(0);  
            discoveredTypes.put(type_name, type_code);            
            return true;
        } 
        else {
            System.out.println("This type had been seen already. Comparing the type definitions...");
            if ( existingType.equals(type_code) ) {
                System.out.println("The type matches the existing definition");
                return false;
            }
            else {
                System.out.println("The type DOES NOT match the existing definition");
                
                System.out.println("This is the existing definition:");
                existingType.print_IDL(0);  

                System.out.println("This is the definition of the type just discovered:");
                type_code.print_IDL(0); 
                return true;
            }
        }
    }

    public static void main(String args[]) throws InterruptedException {
        
        String NDDSHOME = System.getenv("NDDSHOME");  
        String DYLD_LIBRARY_PATH = System.getenv("DYLD_LIBRARY_PATH");  
        System.out.println("NDDSHOME="+ NDDSHOME);  
        System.out.println("DYLD_LIBRARY_PATH="+ DYLD_LIBRARY_PATH);  
        int domainId = 0;
        
        MonitorDicoveredTypes typesSpy = new MonitorDicoveredTypes();
        if ( !typesSpy.start(domainId) ) {
            return;
        }
        
        while (true) {
            typesSpy.waitForDiscoveryData();
            typesSpy.processDiscoveredTypes();
        }
     }
}
