/* (c) Copyright, Real-Time Innovations, 2012.  All rights reserved. 
 * Author:  Gerardo Pardo-Castellote 
 */

package com.rti.dds.snippets;

import java.util.concurrent.ConcurrentSkipListMap;

import com.rti.dds.domain.*;
import com.rti.dds.domain.builtin.ParticipantBuiltinTopicData;
import com.rti.dds.domain.builtin.ParticipantBuiltinTopicDataDataReader;
import com.rti.dds.dynamicdata.DynamicData;
import com.rti.dds.dynamicdata.DynamicDataReader;
import com.rti.dds.dynamicdata.DynamicDataSeq;
import com.rti.dds.dynamicdata.DynamicDataTypeSupport;
import com.rti.dds.infrastructure.*;
import com.rti.dds.publication.builtin.*;
import com.rti.dds.subscription.*;
import com.rti.dds.topic.Topic;
import com.rti.dds.typecode.TypeCode;
import com.rti.ndds.config.Version;

public class MonitorInformationOnSampleWriter {
    private static int verbosity  = 1;
    
    private int domainId;
    private DomainParticipant participant;
    private ParticipantBuiltinTopicDataDataReader   participantsDR;
    private PublicationBuiltinTopicDataDataReader   publicationsDR;
      
    private final static int MAX_ACTIVE_CONDITIONS = 3; // We will only install e conditions on the
    private ConditionSeq activeConditionSeq;
    private WaitSet discoveryWaitSet;
    
    // This will hold the discovered participants
    private ConcurrentSkipListMap<String, ParticipantBuiltinTopicData> discoveredParticipants;
    private ConcurrentSkipListMap<String, TypeCode> discoveredTypes;
    

    private static int byteToInt(byte b) {
        if ( b<0 ) { return b+256; }
        else return b;
    }
    
    static String locatorKind2String(Locator_t locator) {
        switch (locator.kind) {
        case Locator_t.KIND_SHMEM :
            return "SHMEM";
        case Locator_t.KIND_UDPv4 :
            return "UDPv4";
        case Locator_t.KIND_TCPV4_LAN :
            return "TCPv4 (LAN)";
        case Locator_t.KIND_TCPV4_WAN :
            return "TCPv4 (WAN)";
        case Locator_t.KIND_DTLS :
            return "DTLS";
        case Locator_t.KIND_TLSV4_LAN :
            return "TLS (LAN)";
        case Locator_t.KIND_TLSV4_WAN :
            return "TLS (WAN)";
        case Locator_t.KIND_UDPv6 :
            return "UDPv6";
       }
       
       String str = "" + locator.kind;
       return str;
    }

    public String LocatorAddress2String(Locator_t locator) {
        String str = "";
        
        switch (locator.kind) {
        case Locator_t.KIND_UDPv4:
        case Locator_t.KIND_TCPV4_LAN:
        case Locator_t.KIND_TCPV4_WAN:
        case Locator_t.KIND_TLSV4_LAN:
        case Locator_t.KIND_TLSV4_WAN:
            str = byteToInt(locator.address[12]) + "." + byteToInt(locator.address[13])+ "." 
                + byteToInt(locator.address[14]) + "." + byteToInt(locator.address[15]);
            break;
        case Locator_t.KIND_UDPv6:
            str="";
            for ( int i=0; i<16; i+=4 ) {
                str += Integer.toHexString(byteToInt(locator.address[i]))
                    +  Integer.toHexString(byteToInt(locator.address[i+1]))
                    +  Integer.toHexString(byteToInt(locator.address[i+2]))
                    +  Integer.toHexString(byteToInt(locator.address[i+3]))
                    + ":";
            }
            break;
        case Locator_t.KIND_SHMEM:
            str = "<not applicable>";
        }        

        return str;
    }

    public String LocatorSeq2String(LocatorSeq locatorSeq) {
        String str = "";
        if ( locatorSeq.size() == 0 ) {
            return "";
        }
        
        str += "[ ";
        for ( int i=0; i< locatorSeq.size(); ++i ) {
            Locator_t locator = (Locator_t)locatorSeq.get(i);
            if ( i > 0 ) {
                str += ", ";
            }
            str += "{ kind = "   + locatorKind2String(locator)
                + ", address = " + LocatorAddress2String(locator)
                + ", port= "     + locator.port
                + " }";
        }
        str += " ]";

        return str;
    }
    
    private class PrintDynamicDataListener extends DataReaderAdapter {
                
        DynamicData data = new DynamicData();
        SampleInfo  info  = new SampleInfo();

        public void on_data_available(DataReader reader) {
            DynamicDataReader dataReader =
                (DynamicDataReader)reader;
           
            PublicationBuiltinTopicData publicationData = new PublicationBuiltinTopicData();
            try {
                while ( true ) {          
                    dataReader.take_next_sample(data, info); 
                    if (info.valid_data) {
                        System.out.println("Receive data from DataWriter...");
                        dataReader.get_matched_publication_data(publicationData, info.publication_handle);
                           
                        // publicationData.participant_key contains GUID of the Participant that wrote the sample
                        ParticipantBuiltinTopicData participantData = 
                            discoveredParticipants.get(publicationData.participant_key.toString());
  
                        if ( participantData == null ) {
                            System.out.println("PrintDynamicDataListener: Could not find Participant with key:" 
                                    + publicationData.participant_key);
                        }
                        System.out.println("... source Address is: " 
                                + LocatorSeq2String(participantData.metatraffic_unicast_locators) );
                        
                     }
                }
            } catch (RETCODE_NO_DATA noData) {
                // No data to process
            } finally { }
        }
    }

    private boolean start(int theDomainId) {        
        Version version = Version.get_instance ();
        System.out.println("Running RTI Connext version: " + version);
        
        discoveredParticipants = new ConcurrentSkipListMap<String, ParticipantBuiltinTopicData>();
        discoveredTypes = new ConcurrentSkipListMap<String, TypeCode>();

        domainId = theDomainId;
        DomainParticipantFactory factory = DomainParticipantFactory.get_instance();
        DomainParticipantFactoryQos factoryQos = new DomainParticipantFactoryQos();
        
        // This instructs the DomainParticipantFactory to not enable the DomainParticipant
        // entities it creates automatically. This is needed so that we have a chance to
        // retrieve the builtin data-readers before the participant starts receiving
        // discovery data. Later it is explained why this is needed
        factoryQos.entity_factory.autoenable_created_entities = false;
        
        DomainParticipantQos pQos = new DomainParticipantQos();
        factory.get_default_participant_qos(pQos);
        pQos.participant_name.name = "RTI Connext Monitor Types Snippet";
        try {
            participant = factory.create_participant(
                    domainId, 
                    pQos, //DomainParticipantFactory.PARTICIPANT_QOS_DEFAULT, 
                    null, // listener
                    StatusKind.STATUS_MASK_NONE);
        } catch ( Exception e) {
            String lastStartError = "Error creating the DDS domain. Common causes are:"
                + "\n   - Lack of a network. E.g disconected wireless."
                + "\n   - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
                + "\n      for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";
            
            System.out.println(lastStartError);
            return false;
        }
   
        // We count ourselves as a participant that is present

        // The "lookup_xxx" operations not only retrieve the built-in data-readers but also
        // activate the caching of discovered types. To save resources discovered types
        // are only saved in the built-in reader cache which is only active after calling
        // the corresponding "lookup_xxx" operation. 
        // It is for this reason that we instructed the DomainParticipantFactory to not
        // automatically enable the DomainParticipant. This gives us the opportunity to
        // retrieve the built-in entities and activate the caching of discovered types
        // before we receive any discovery information. If we did not do this we may
        // miss the data-types of the first few entities discovered        
        participantsDR = (ParticipantBuiltinTopicDataDataReader)
            participant.get_builtin_subscriber().lookup_datareader("DCPSParticipant");
         
        publicationsDR = (PublicationBuiltinTopicDataDataReader) 
            participant.get_builtin_subscriber().lookup_datareader("DCPSPublication");
         
        
        // Enable the participant. This causes it to start receiving discovery traffic.  
        // Note: Enable fails if there is no network of if any interfaces are not multicast enabled
        // In some platforms (e.g. MacOSX) having the VPN running causes enable() to fail
        try {
            participant.enable(); 
        } catch ( Exception e) {
            String lastStartError = "Error enabling the DDS domain. Common causes are:"
                + "\n   - Lack of a network. E.g disconected wireless."
                + "\n   - A network interface that does not bind multicast addresses. In some platforms enabling using the TUN interface "
                + "\n      for (Open)VPN causes this. If this is your situation try configure (Open)VPN to use TAP instead.";
            
            System.out.println(lastStartError);
            return false;
        }
        
        // Create a WairSet object that can be used to block the calling thread until there is
        // discovery data to read. This avoids having to poll and this use CPU continuously. 
        discoveryWaitSet = new WaitSet();
        
        // Attach the conditions that would wakeup the waitset. In this case the arrival of data on
        // any of the built-in datareaders      
        discoveryWaitSet.attach_condition(participantsDR.get_statuscondition());
        participantsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);

        discoveryWaitSet.attach_condition(publicationsDR.get_statuscondition());
        publicationsDR.get_statuscondition().set_enabled_statuses(StatusKind.DATA_AVAILABLE_STATUS);
                
        activeConditionSeq = new ConditionSeq(MAX_ACTIVE_CONDITIONS);

        return true;
    }

    private void waitForDiscoveredDataWriters() {
        Duration_t waitDuration = 
            new Duration_t(Duration_t.DURATION_INFINITE_SEC, Duration_t.DURATION_INFINITE_NSEC);
        
        System.out.println("waitForDiscoveredDataWriters");
        try {
            discoveryWaitSet.wait(activeConditionSeq, waitDuration);
        } catch (RETCODE_TIMEOUT timeoutRetcode) {}
    }
  
    public void processDiscoveredParticipants() {
        ParticipantBuiltinTopicData participantData = new ParticipantBuiltinTopicData();;
        SampleInfo info = new SampleInfo();;
        
        try {
            while ( true ) {          
                participantsDR.take_next_sample(participantData, info);

                if ( info.instance_state == InstanceStateKind.ALIVE_INSTANCE_STATE ) {
                    // Add the participant information to the 
                    System.out.println("Participant (New)" +
                            " messageNum: " + info.reception_sequence_number.low +
                            " name: \"" + participantData.participant_name.name + "\"" +
                            " created at: " + info.source_timestamp +
                            " detected at: " + info.reception_timestamp +
                            " participant_key: " + participantData.key.toString());
                    discoveredParticipants.put(participantData.key.toString(), participantData);
                } else {
                    String dissapearReason;
                    if ( info.instance_state == InstanceStateKind.NOT_ALIVE_DISPOSED_INSTANCE_STATE  ) {
                        dissapearReason = "deleted";
                    } else {
                        dissapearReason = "lost connection";
                    }     
                    if ( info.valid_data ) {
                        System.out.println("Participant (Dissapeared - "+ dissapearReason +"):" +
                                " messageNum: "  + info.reception_sequence_number.low +
                                " name: \"" + participantData.participant_name.name + "\"" +
                                " detected at: " + info.reception_timestamp +
                                " participant_key: " + participantData.key);
                    }
                    else {
                        System.out.println("Participant (Dissapeared - "+ dissapearReason +"):" +
                                " messageNum: "  + info.reception_sequence_number.low +
                                " source sn: " + info.publication_sequence_number.low +
                                " handle: " + info.instance_handle.toString() +
                                " detected at: " + info.reception_timestamp );                     
                    }
                    
                    // Find the participant that we had saved
//                    Commented out because for some reason participantsDR.get_key_value throws exception
//                    participantsDR.get_key_value(participantData, info.instance_handle);
//                    if ( discoveredParticipants.remove(participantData.key.toString()) == null) {
//                        System.out.println("Participant with key: " 
//                                + participantData.key.toString()
//                                + "was not in the list\n");
//                    }        
                    
                }
            }

        } 
        catch (RETCODE_NO_DATA noData) {  } 

        finally {  }
    }


    
    private void processDiscoveredDataWriters() {
        System.out.println("processDiscoveredDataWriters");
        if ( publicationsDR.get_statuscondition().get_trigger_value() ) {
            processTypesInDiscoveredDataWriters();
        }
    }

    private void processTypesInDiscoveredDataWriters() {
        PublicationBuiltinTopicData publicationData = new PublicationBuiltinTopicData();
        SampleInfo info = new SampleInfo();
       
        try {
            while ( true ) {
                publicationsDR.take_next_sample(publicationData, info);
                
                // We are only interested in new DataWriters which we have not
                // seen before. This is because the data-type will never change for any
                // one DataWriter to other updates indicating change of QoS or deletion of the
                // DataWriter can be safely ignored
                if ( info.view_state == ViewStateKind.NEW_VIEW_STATE ) {
                   System.out.println("DataWriter (New)" +
                            " name: \""  + publicationData.publication_name.name + "\"" +
                            " topic: \"" + publicationData.topic_name + "\"" +
                            " type: \""  + publicationData.type_name + "\"" );

                   processDiscoveredTopic(publicationData.topic_name, 
                           publicationData.type_name, publicationData.type_code);
                }
             }
        }
        catch (RETCODE_NO_DATA noData) {    } 
        // catch (RETCODE_BAD_PARAMETER badParam) {   }
        finally {  }        
    }

    private void processDiscoveredTopic(String topic_name, String type_name, TypeCode type_code) {
        TypeCode existingType = null;
        DynamicDataTypeSupport typeSupport = null;
        
        System.out.println("Discovered topic: \"" + topic_name + "\"  with type: \"" + type_name + "\"");
        if ( type_code == null ) {
            System.out.println("No type information was supplied for type: \"" + type_name + "\"");    
            return;
        }
        
        // See if we already had a type with the same name:
        existingType = discoveredTypes.get(type_name);
        
        if ( existingType == null ) {
            System.out.println("This type had not seen before. Its structure is:");
            type_code.print_IDL(0);  
            discoveredTypes.put(type_name, type_code);
            
            // register the discovered type with the Participant
            typeSupport = new DynamicDataTypeSupport(type_code, DynamicDataTypeSupport.TYPE_PROPERTY_DEFAULT);
            typeSupport.register_type(participant, type_name);
        } 
        else {
            System.out.println("This type had been seen already. Comparing the type definitions...");
            if ( existingType.equals(type_code) ) {
                System.out.println("The type matches the existing definition");
            }
            else {
                System.out.println("The type DOES NOT match the existing definition");
                
                System.out.println("This is the existing definition:");
                existingType.print_IDL(0);  

                System.out.println("This is the definition of the type just discovered:");
                type_code.print_IDL(0);                
            }
        }
        
        // Check if we had already a topic created with that name
        if ( this.participant.lookup_topicdescription(topic_name) == null ) {
            System.out.println("This topic \"" + topic_name + "\" had not seen before. Creating it.");
            
            // Topic was not known. Create a Topic and DataReader
            try {
                Topic topic = participant.create_topic(
                        topic_name, type_name, 
                        DomainParticipant.TOPIC_QOS_DEFAULT,
                        null /* listener */, StatusKind.STATUS_MASK_NONE);
                
                PrintDynamicDataListener listener = new PrintDynamicDataListener();
                participant.create_datareader(
                        topic, Subscriber.DATAREADER_QOS_DEFAULT,
                        listener, StatusKind.DATA_AVAILABLE_STATUS);
               
            }
            catch ( Exception e ) {
                System.out.println("creation of Topic and DataReader failed with execption: " + e) ;
            }           
        }
    }


    public static void main(String args[]) throws InterruptedException {
        
        String NDDSHOME = System.getenv("NDDSHOME");  
        String DYLD_LIBRARY_PATH = System.getenv("DYLD_LIBRARY_PATH");  
        System.out.println("NDDSHOME="+ NDDSHOME);  
        System.out.println("DYLD_LIBRARY_PATH="+ DYLD_LIBRARY_PATH);  
        int domainId = 0;
        
        MonitorInformationOnSampleWriter dataSpy = new MonitorInformationOnSampleWriter();
        if ( !dataSpy.start(domainId) ) {
            return;
        }
        
        while (true) {
            dataSpy.waitForDiscoveredDataWriters();
            dataSpy.processDiscoveredParticipants();
            dataSpy.processDiscoveredDataWriters();
        }
     }

 
}
