/*
 * Process 0 envoie des messages compresses (de taille blocksize)
 * les recepteurs decompressent. C'est un BCAST MPI qui est mesure'
 * ou un multicast (voir les options de compilation plus bas)
 *
 * mpicc -O4 testMulticastBcastMPIwithTC.c -l z
 * si les librairies zlib sont deja installees chez vous
 *
 * Execution : mpirun -np 3 a.out 224.255.10.10 9000
 *
 * mpicc -O4 testMulticastBcastMPIwithTC.c -U TC -l z
 * si les librairies zlib sont deja installees chez vous et que
 * vous ne voulez PAS utiliser TC (traffic Control)
 * Pour configurer les parametres de TC voir :
 * #define MACHINEFILE "/scratch/ccerin/machinefile"
 * #define PathTC "/sbin/"
 * #define OUTGOINGTRAFFIC  2000	//in kbit/s please ! 
 *
 *
 * Vous pouvez aussi compiler comme cela pour ne pas avoir 
 * de compression du tout:
 * mpicc -O4 testMulticastBcastMPIwithTC.c -D NO_COMPRESSION -l z
 *
 * Vous pouvez aussi compiler comme cela pour ne pas avoir 
 * de compression et utiliser le multicast:
 * mpicc -O4 testMulticastBcastMPIwithTC.c -D NO_COMPRESSION -D MULTICAST -l z
 *
 * Vous pouvez aussi compiler comme cela pour ne pas avoir 
 * de compression et utiliser le bcast mpi:
 * mpicc -O4 testMulticastBcastMPIwithTC.c -D NO_COMPRESSION -U MULTICAST -l z
 *
 * Vous pouvez aussi compiler comme cela pour utiliser TC (Traffic Control)
 * mpicc -O4 testMulticastBcastMPIwithTC.c -D TC -l z
 * Note: generalement la commande tc est installee sous /sbin et tc fait
 * partie de iproute2
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <math.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <fcntl.h>
#include <unistd.h>
#include <zlib.h>
#include <mpi.h>
#include <sched.h>
#include <string.h>
#include <math.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#include <sys/types.h>		/* for type definitions */
#include <sys/socket.h>		/* for socket API function calls */
#include <netinet/in.h>		/* for address structs */
#include <arpa/inet.h>		/* for sockaddr_in */
#include <stdio.h>		/* for printf() */
#include <stdlib.h>		/* for atoi() */
#include <string.h>		/* for strlen() */
#include <unistd.h>		/* for close() */

//#define MAX_LEN  1024         /* maximum string size to send */
#define MIN_PORT 1024		/* minimum port allowed */
#define MAX_PORT 65535		/* maximum port allowed */

// A exprimer en bytes
int MAX_MSG_SZ=2048;

#ifdef TC

  // Do your own settings here
#define MYPORT 22

  // Christophe
#define MACHINEFILE "/scratch/ccerin/machinefile"
#define PathTC "/sbin/"

#define OUTGOINGTRAFFIC  2000	/* in kbit/s please ! */

#endif

/* nombre de blocks de taille 100 octets */
#define BLOCKSIZE (2048*100)

/* Niveau de compression pour la primitive compress ou compress2 */
#define COMPRESSION_LEVEL 1
/* Nombre d'experiences */
#define NB_EXPERIENCE 12

#define CHECK_ERR(err, msg) { \
    if (err != Z_OK) { \
        fprintf(stderr, "%s error: %d\n", msg, err); \
        switch (err) { \
	case Z_MEM_ERROR : fprintf(stderr, "not enough memory\n"); break; \
	case Z_BUF_ERROR : fprintf(stderr, "not enough memory in the output buf\n"); break; \
	case Z_DATA_ERROR : fprintf(stderr, "input data corrupted\n"); break; \
       	case Z_STREAM_ERROR : fprintf(stderr, "level parameter is invalid\n"); break; \
        } \
        exit(1); \
    } \
}

#define PACKET     1024		/* number of integers per message */

struct record
{
  char sortkey[10];
  char recnum[20];
  char txtfld[70];
};
typedef struct record RECORD;

struct record_simple
{
  char sortkey[10];
  int permut;
};
typedef struct record_simple ItemSimple;

//
//GLOBALS
//
int RecordSize = 100;
//sizeof(struct record);
//100 bytes
int RecordsPerBuffer = 1 << 0;
//64 K records per buffer
const int BufferSize = 100 * (1 << 0);
//about 512 KB of IO at a time.

long long MaxRecord = 1;
// number of records to generate
char *OutFileName = "/scr/hfkaier/temp.dat";
//Name of file to create


#define LNAME 19
#define FNAME  "/tmp/_sort%03d.dat"
int MaxInsertions = 5;
typedef char Item[100];


//
#define   less(A, B)  (A < B)
//
#define   exch(A, B) { Item t = A; A = B; B = t; }
#define   swap(A, B) { int t = A; A = B; B = t; }

static int nNodes;		/*
				 * number of nodes for selection tree
				 */
static char *ofName;		/*
				 * output filename
				 */
static char *ifName;		/*
				 * input filename
				 */

/*
 * Configuration du traffic sortant
 */
void
mynetwork (char *machinefile)
{
  FILE *fp, *pp;
  char buf[256];
  char order[256];
  struct hostent *myhost;
  struct hostent *tohost;
  char *MyIP;
  char **p;
  struct in_addr Me, OtherHost;
  int classid;
  char commandeTC[256];

#ifdef TC
  // Reinitialize the queues
  sprintf (commandeTC, "%stc qdisc del dev eth0 root", PathTC);
  system (commandeTC);
  sprintf (commandeTC, "%stc qdisc del dev eth0 ingress", PathTC);
  system (commandeTC);
  sprintf (commandeTC, "%stc qdisc add dev eth0 handle ffff: ingress",
	   PathTC);
  system (commandeTC);
  sprintf (commandeTC, "%stc qdisc add dev eth0 root handle 1: htb", PathTC);
  system (commandeTC);

  // Keep some information about me (localhost)
  if ((pp = popen ("uname -n", "r")) == NULL)
    {
      perror ("popen error");
      exit (1);
    }
  fgets (buf, sizeof (buf), pp);
  buf[strlen (buf) - 1] = '\0';
  pclose (pp);
  // printf("%s\n",buf);
  myhost = (struct hostent *) gethostbyname (buf);
  if (myhost == NULL)
    {
      perror ("gethostbyname-1");
      exit (1);
    };
  for (p = myhost->h_addr_list; *p != 0; p++)
    {
      char **q;
      (void) memcpy (&Me.s_addr, *p, sizeof (Me.s_addr));
      (void) printf ("%s\t%s", inet_ntoa (Me), myhost->h_name);
      // We list also aliases
      // for (q = myhost->h_aliases; *q != 0; q++)
      // (void) printf (" %s", *q);
      (void) putchar ('\n');
    }

  // Read the machine file containing all the machines connected
  // to me and configure them (except me)
  fp = fopen (machinefile, "r");
  if (fp == NULL)
    {
      perror ("Problem in opening MachineFile\n");
      exit (1);
    }
  classid = 1;
  while (NULL != fgets (buf, 256, fp))
    {
      char chMe[100], chOther[100];

      buf[strlen (buf) - 1] = '\0';
      printf ("|%s|\n", buf);
      myhost = (struct hostent *) gethostbyname (buf);
      if (myhost == NULL)
	{
	  perror ("gethostbyname-2 error...");
	}
      else
	{
	  for (p = myhost->h_addr_list; *p != 0; p++)
	    {
	      char **q;
	      (void) memcpy (&OtherHost.s_addr, *p,
			     sizeof (OtherHost.s_addr));
	      (void) printf ("%s\t%s", inet_ntoa (OtherHost), myhost->h_name);
	      // We list also aliases
	      // for (q = myhost->h_aliases; *q != 0; q++)
	      //  (void) printf (" %s", *q);
	      (void) putchar ('\n');
	      strcpy (chMe, (const char *) inet_ntoa (Me));
	      strcpy (chOther, (const char *) inet_ntoa (OtherHost));
	      //printf ("src=%s, dst=%s\n", chMe, chOther);
	    }
	  if (OtherHost.s_addr != Me.s_addr)
	    {
	      // src and dst are different

	      // The 2 lines below are for incoming traffic. We do not use
	      // such possibility here since each node is supposed to configure
	      // its outgoing traffic!
	      // sprintf (order,
	      //                       "/sbin/tc filter add dev eth0 parent ffff: protocol ip prio 50 u32 match ip src %s police rate 100kbit burst 10k drop flowid :%d",
	      //                       chMe, classid);
	      // system (order);

	      sprintf (order,
		       "%stc class add dev eth0 parent 1: classid 1:%d htb rate %dkbit ceil %dkbit",
		       PathTC, classid, OUTGOINGTRAFFIC, OUTGOINGTRAFFIC);
	      system (order);

	      // In this case, all traffic to cOther and port MYPORT 
	      // will be matched (0xfff is a mask).
	      sprintf (order,
		       "%stc filter add dev eth0 parent 1: protocol ip prio 16 u32 match ip dst %s match ip dport %d 0xffff flowid 1:%d",
		       PathTC, chOther, MYPORT, classid++);
	      system (order);
	    }
	  else
	    printf
	      ("---I do not configure the network between me and me-----\n");
	}
    }
#endif
}

/********************* DATA GENERATION **********************************

    Creator: Tom Barclay MSR

    Date: 1/17/96

    Copyright (c) 1995, 1996, 1998 Microsoft Corporation, All Right Reserved

****************************************************************************/

/*
    SortGen.EXE :  Sort Benchmark data generator

    This is a console application.
    It generates 100 byte records.
        The first 10 bytes is a random key of 10 printable characters.
        The next 10 bytes is the record number,0,1,... with leading blanks.
         The remaining bytes contain varying fields of
            "AAAAAAAAAA", "BBBBBBBBBB", ... "ZZZZZZZZZZ".
            The series repeats until it fills the record.
            The next record picks up where we left off on the previous record.

    This is the command syntax:

        SortGen.exe NumberOfRecordsToGenerate  OutputFileName

    Example generating 100mb

        SortGen.exe 1000000 sort100mb.dat

    Modification History:
    JNG         04/29/98    Modified Barclay's code to use stdio.h.
    Peng Liu    12/07/02    Modified Barclay's code to generate 100-bytes records instead of 101-bytes records.
    JNG         01/19/04    Modified Peng Liu code to use long long (64bit int) to handle more than 2 billion records.

    historical note: LIMITATIONS:  Program works up to 2^31 records (about 200GB), before 31 bit record counter overflows.
					  We hope this will be a problem someday.
    This was a problem in 2004, and the problem was removed by using long-long datatype.
*/



#define  VERSION    "V1.1-4"

/*************************************************************************
`* format of record: 10 bytes of key,	                                 *
 *                   10 bytes of serial number,                          *
 *                   80 bytes of data                                    *
 *************************************************************************/


/*************************************************************************
 *                                                                       *
 * My random number generator (I trust it).                              *
 *                                                                       *
 *************************************************************************/
struct timeval chrono1, chrono2;
struct timezone tempsz;

unsigned
my_rand (void)
{
  static unsigned rand;

//ORIGINAL:return (rand = rand * 3141592621 + 663896637);
  return (rand = rand * random () + 663896637);
}

/*************************************************************************
 *                                                                       *
 * Generate a random key of 10 printable characters                      *
 *                                                                       *
 *************************************************************************/
void
rand_key (char key[10])
{
  unsigned temp;

  //generate a random key consisting of 95 ascii values, ' ' to '~'
  temp = my_rand ();
  temp /= 52;
  //filter out lower order bits
  key[3] = ' ' + (temp % 95);
  temp /= 95;
  key[2] = ' ' + (temp % 95);
  temp /= 95;
  key[1] = ' ' + (temp % 95);
  temp /= 95;
  key[0] = ' ' + (temp % 95);

  temp = my_rand ();
  temp /= 52;
  //filter out lower order bits
  key[7] = ' ' + (temp % 95);
  temp /= 95;
  key[6] = ' ' + (temp % 95);
  temp /= 95;
  key[5] = ' ' + (temp % 95);
  temp /= 95;
  key[4] = ' ' + (temp % 95);

  temp = my_rand ();
  temp /= 52 * 95 * 95;
  //filter out lower order bits
  key[9] = ' ' + (temp % 95);
  temp /= 95;
  key[8] = ' ' + (temp % 95);
}

/*************************************************************************
 *                                                                       *
 * Generate a random record.                                             *
 *                                                                       *
 *************************************************************************/
void
gen_rec (struct record *rp)
{
  static int current;
  char *sptr;
  static char nxtchar = 'A';
  int i, j;

  rand_key (rp->sortkey);
  //Get 10 byte sort key
  sprintf (rp->recnum, "%20d", current++);
  sptr = rp->txtfld;
  for (i = 0; i < 7; i++)
    {
      for (j = 0; j < 10; j++)
	*sptr++ = nxtchar;
      nxtchar++;
      if (nxtchar > 'Z')
	nxtchar = 'A';
    }
  sptr[-2] = '\r';
  sptr[-1] = '\0';
}





/*************************************************************************
 *                                                                       *
 * Main routine of data generator                                        *
 *                                                                       *
 *************************************************************************/
/*
 * DataGeneration (long long DesiredRecords, Item *tab){ return 0; }
 *
 */

DataGeneration (long long DesiredRecords, Item * tab, int mypid)
{
  int i, j;
  long long GeneratedRecords = 0;
  //Records generated so far

  long BufferRecords = 0;
  //Records in buffer
  // char        *Buffer;
  //output buffer
  long WriteCount = 0;
  //bytes written by this IO
  long error = 0;
  //error flag
  struct record *Buffer;


  /*
   * random generator initialization
   */
  /*
   * We do prefer this version in order to generate
   */
  /*
   * different sequences; otherwise the seed is always the same
   */
  gettimeofday (&chrono1, &tempsz);
  srandom ((unsigned int) (chrono1.tv_usec));


  /******************************************
   *                                        *
   * Put out greeting                       *
   *                                        *
   ******************************************/


  //fprintf(stderr, "SortGen - Sort Data Generator version %s\n", VERSION);
  //fprintf(stderr, "\tGenerating %I64d sort test data records to file %s \n", DesiredRecords, OutFileName);


	/************************************
         *                                  *
         *   allocate output buffer         *
         *                                  *
         ************************************/
  Buffer = (RECORD *) malloc (RecordSize);
  if (Buffer == NULL)
    {
      fprintf (stderr, "? Couldn't allocate the I/O buffer\n");
      error = 1;
      goto common_exit;
    }
	/************************************
         *                                  *
         *      Main Loop                   *
         *      Fill the buffer write it out*
         *                                  *
         ************************************/

  while (GeneratedRecords < DesiredRecords)
    {
      gen_rec ((struct record *) Buffer);
      memcpy (&tab[GeneratedRecords], &(Buffer->sortkey), 100);
      //printf("%d- %s", mypid, tab[GeneratedRecords]);
      GeneratedRecords++;
    }


common_exit:
  //fprintf(stderr, "\nCompleted writing %I64d Records to array\n", DesiredRecords);
  free (Buffer);
  //free record buffer

  /*
   * if (error)               remove( OutFileName);
   */
  return (error);
  //return status code
}


int MYfpOUT;

/***********************************************************************/
int
main (int argc, char *argv[])
{
  char cc[100];
  float f;
  int NbIteration, i, j, m, micro, second, *per;
  Bytef *cloneTab;
  Item *tab;
  char c = 'z';
  uLongf destLen, Len;
  int err;
  char CommandeTC[256];
  long long s = 0;
  MPI_Status status;
  int nprocs, mypid, taille;

  int sock;			/* socket descriptor */
  char *send_str;		/* string to send */
  struct sockaddr_in mc_addr;	/* socket address structure */
  unsigned int send_len;	/* length of string to send */
  char *mc_addr_str;		/* multicast IP address */
  unsigned short mc_port;	/* multicast port */
  unsigned char mc_ttl = 1;	/* time to live (hop count) */

  int flag_on = 1;		/* socket option flag */
  int recv_len;			/* length of string received */
  struct ip_mreq mc_req;	/* multicast request structure */
  struct sockaddr_in from_addr;	/* packet source */
  unsigned int from_len;	/* source addr length */

  char buf[256], *send_buffer, *recv_buffer;
  FILE *pp;
  char *car;


  static const char *myVersion = ZLIB_VERSION;

  MPI_Init (&argc, &argv);
  MPI_Comm_rank (MPI_COMM_WORLD, &mypid);
  MPI_Comm_size (MPI_COMM_WORLD, &nprocs);
  MPI_Barrier (MPI_COMM_WORLD);

  if (zlibVersion ()[0] != myVersion[0])
    {
      fprintf (stderr, "incompatible zlib version\n");
      exit (1);

    }
  else if (strcmp (zlibVersion (), ZLIB_VERSION) != 0)
    {
      fprintf (stderr, "warning: different zlib version\n");
    }

  if (mypid == 0)
    {
      printf ("zlib version %s = 0x%04x, compile flags = 0x%lx\n",
	      ZLIB_VERSION, ZLIB_VERNUM, zlibCompileFlags ());
    }

  /* validate number of arguments */
  if (argc != 3)
    {
      fprintf (stderr,
	       "Usage: %s <Multicast IP> <Multicast Port>\n", argv[0]);
      fprintf (stderr, "mpirun -np 3 a.out 224.255.10.10 9000\n");
      exit (1);
    }

  /*
   * Seul le noeud 0 est considéré comme l'emetteur et c'est donc
   * pour lui que l'on configure le traffic de sortie
   */
#ifdef TC
  if (mypid == 0)
    {
      printf ("Debut config du reseau\n");
      mynetwork (MACHINEFILE);
    }
#endif


  // per = calloc(BLOCKSIZE, sizeof(int));

  tab = (Item *) calloc (BLOCKSIZE, sizeof (Item));
  cloneTab =
    (Bytef *) calloc ((BLOCKSIZE * sizeof (Item)) +
		      (int) ((BLOCKSIZE * sizeof (Item)) / 10) + 12, 1);

  if (tab == NULL)
    {
      printf ("A- unable to alloc %ld elements for tab[]",
	      BLOCKSIZE * sizeof (Item));
      exit (0);
    }
  if (cloneTab == NULL)
    {
      printf ("A- unable to alloc %ld elements for cloneTab[]",
	      BLOCKSIZE * sizeof (Item) +
	      (int) ((BLOCKSIZE * sizeof (Item)) / 10) + 12);
      exit (0);
    }

  /********* SETTING MAX MSG SZ HERE ***********/
  if ((pp = popen ("/sbin/sysctl net.ipv4.tcp_wmem", "r")) == NULL)
    {
      perror ("popen error");
      exit (1);
    }

  while(fgets (buf, sizeof (buf), pp) != NULL) {
    buf[strlen (buf) - 1] = '\0';
    car=strtok(buf,"=");
    car=strtok(NULL,"=");
  }
  // Attention il y a un effet de bord avec le atoi() car
  // la boucle renvoie une chaine avec 3 valeurs
  // et on prend la premiere valeur :
  // $ /sbin/sysctl net.ipv4.tcp_wmem
  // $ net.ipv4.tcp_wmem = 4096163844194304

  //MAX_MSG_SZ=1024;
  MAX_MSG_SZ=atoi(car);

  printf("MAX_MSG_SZ = %d\n",MAX_MSG_SZ); /* le resultat est ici */

  send_buffer = calloc (MAX_MSG_SZ, 1);

  if (send_buffer == NULL)
    {
      printf ("A- unable to alloc %ld elements for send_buffer[]", MAX_MSG_SZ);
      exit (0);
    }

  recv_buffer = calloc (MAX_MSG_SZ, 1);

  if (recv_buffer == NULL)
    {
      printf ("A- unable to alloc %ld elements for recv_buffer[]", MAX_MSG_SZ);
      exit (0);
    }

  /***********************************************/


  f = 0.0;
  s = 0;
  for (i = 1; i <= NB_EXPERIENCE; i++)
    {

      // HYPER IMPORTANT pour usage correct de compress2
      destLen = (BLOCKSIZE * sizeof (Item)) +
	(BLOCKSIZE * sizeof (Item)) / 10 + 12;
      Len = destLen;
      if (mypid == 0)
	{
	  printf ("Debut generation donnees\n");
	  DataGeneration (BLOCKSIZE, tab, BLOCKSIZE);
	  printf ("Fin generation donnees\n");
#ifndef NO_COMPRESSION
	  printf ("Debut de la compression %d/%d\n", i, NB_EXPERIENCE);
	  gettimeofday (&chrono1, &tempsz);
	  err =
	    compress2 ((Bytef *) cloneTab, &destLen, (Bytef *) tab,
		       (uLong) (BLOCKSIZE * sizeof (Item)),
		       COMPRESSION_LEVEL);
	  //printf("PID 0, taille compressee=%lu\n",destLen);
	  CHECK_ERR (err, "compress2");
	  // on somme les tailles pour estimer le taux de compression moyen
	  s += destLen;
#else
	  // on somme les tailles pour estimer le taux de compression moyen
	  s += BLOCKSIZE * sizeof (Item);
#endif
	}

#ifndef NO_COMPRESSION
      // On doit broadcaster la taille du buffer compresse.
      MPI_Bcast (&destLen, 1, MPI_INT, 0, MPI_COMM_WORLD);
#endif

      /*
       * Receiving first because we have to wait!
       */

#ifdef MULTICAST
      if (mypid != 0)
	{

	  mc_addr_str = argv[1];	/* arg 1: multicast ip address */
	  mc_port = atoi (argv[2]);	/* arg 2: multicast port number */

	  /* validate the port range */
	  if ((mc_port < MIN_PORT) || (mc_port > MAX_PORT))
	    {
	      fprintf (stderr, "Invalid port number argument %d.\n", mc_port);
	      fprintf (stderr, "Valid range is between %d and %d.\n",
		       MIN_PORT, MAX_PORT);
	      exit (1);
	    }

	  /* create socket to join multicast group on */
	  if ((sock = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
	    {
	      perror ("socket() failed");
	      exit (1);
	    }

	  /* set reuse port to on to allow multiple binds per host */
	  if ((setsockopt (sock, SOL_SOCKET, SO_REUSEADDR, &flag_on,
			   sizeof (flag_on))) < 0)
	    {
	      perror ("setsockopt() failed");
	      exit (1);
	    }

	  /* construct a multicast address structure */
	  memset (&mc_addr, 0, sizeof (mc_addr));
	  mc_addr.sin_family = AF_INET;
	  mc_addr.sin_addr.s_addr = htonl (INADDR_ANY);
	  mc_addr.sin_port = htons (mc_port);

	  /* bind to multicast address to socket */
	  if ((bind (sock, (struct sockaddr *) &mc_addr, sizeof (mc_addr))) <
	      0)
	    {
	      perror ("bind() failed");
	      exit (1);
	    }

	  /* construct an IGMP join request structure */
	  mc_req.imr_multiaddr.s_addr = inet_addr (mc_addr_str);
	  mc_req.imr_interface.s_addr = htonl (INADDR_ANY);

	  /* send an ADD MEMBERSHIP message via setsockopt */
	  if ((setsockopt (sock, IPPROTO_IP, IP_ADD_MEMBERSHIP,
			   (void *) &mc_req, sizeof (mc_req))) < 0)
	    {
	      perror ("setsockopt() failed");
	      exit (1);
	    }

	  /* block waiting to receive a packet */
#ifndef NO_COMPRESSION
	  taille = destLen;
#else
	  taille = BLOCKSIZE * sizeof (Item);
#endif
	  //printf("pid=%d, attente receive\n",mypid);

	  /* le recepteur doit avertir l'emetteur qu'il est pret a recevoir */
	  MPI_Send (&c, 1, MPI_CHAR, 0, 11, MPI_COMM_WORLD);

	  /* maintenant je me place en attente de reception */
#ifndef NO_COMPRESSION
	  for(j=0;j<(int)(taille/MAX_MSG_SZ);j++){
	    // printf("\n%d receive message numbre %d/%d",mypid,j,taille/MAX_MSG_SZ);
	    if ((recv_len = recvfrom (sock, cloneTab+j*MAX_MSG_SZ, MAX_MSG_SZ, 0,
				      (struct sockaddr *) &from_addr,
				      &from_len)) < 0)
	      {
		perror (" recvfrom() failed");
		exit (1);
	      }
	  } //for(j=0;j<(int)(taille/MAX...
	  // Last msg
	  if((taille % MAX_MSG_SZ) > 0)
	    {
	      // printf("\nlast message");
	      if ((recv_len = recvfrom (sock, cloneTab+(j*MAX_MSG_SZ), taille%MAX_MSG_SZ, 0,
					(struct sockaddr *) &from_addr,
					&from_len)) < 0)
		{
		  perror ("last msg recvfrom() failed");
		  exit (1);
		}
	    }
#else 
	  for(j=0;j<(int)(taille/MAX_MSG_SZ);j++){
	    //            memset(recv_buffer, 0, MAX_MSG_SZ);
	    //	    if ((recv_len = recvfrom (sock, recv_buffer, MAX_MSG_SZ, 0,
				      //				      (struct sockaddr *) &from_addr,
				     //		      &from_len)) < 0)
	      //     {
		//	perror ("recvfrom() failed");
		//	exit (1);
		//  }
            //memcpy(tab+((j*MAX_MSG_SZ)/sizeof(Item)),recv_buffer,MAX_MSG_SZ);

	    if ((recv_len = recvfrom (sock, tab+((j*MAX_MSG_SZ)/sizeof(Item)), MAX_MSG_SZ, 0,
				      (struct sockaddr *) &from_addr,
				      &from_len)) < 0)
	      {
		perror ("recvfrom() failed");
		exit (1);
	      }

	  } 
	   // Last msg

	  if((taille%MAX_MSG_SZ)>0)
	    {
              //memset(recv_buffer, 0, MAX_MSG_SZ);
	      //printf("RECV\n");
	      //if ((recv_len = recvfrom (sock, recv_buffer, taille%MAX_MSG_SZ, 0,
	      //				(struct sockaddr *) &from_addr,
	      //				&from_len)) < 0)
	      //	{
	      //	  perror ("recvfrom() failed");
	      //	  exit (1);
	      //	}
              //memcpy(tab+j*(MAX_MSG_SZ/sizeof(Item)),recv_buffer,taille%MAX_MSG_SZ);
	      if ((recv_len = recvfrom (sock, tab+j*(MAX_MSG_SZ/sizeof(Item)), taille%MAX_MSG_SZ, 0,
					(struct sockaddr *) &from_addr,
					&from_len)) < 0)
		{
		  perror ("recvfrom() failed");
		  exit (1);
		}
	    }	  
#endif
	  //printf("pid=%d, fin receive\n",mypid);

	  /* output received string */
	  /*
	     printf("Received %d bytes from %s: ", recv_len, 
	     inet_ntoa(from_addr.sin_addr));
	     printf("%s", recv_str);
	   */

	  /* send a DROP MEMBERSHIP message via setsockopt */
	  if ((setsockopt (sock, IPPROTO_IP, IP_DROP_MEMBERSHIP,
			   (void *) &mc_req, sizeof (mc_req))) < 0)

	    {
	      perror ("setsockopt() failed");
	      exit (1);
	    }

	  close (sock);
#ifndef NO_COMPRESSION
	  if (mypid != 0)
	    {
	      err =
		uncompress ((Bytef *) tab, &Len, (Bytef *) cloneTab, taille);
	      CHECK_ERR (err, "uncompress");
	    }
#endif
	}
      else
	{			/* partie des senders */

	  /*
	   * Starting sending messages
	   */

	  if (mypid == 0)
	    {

	      /* Je commence par recevoir des ACK des emetteurs qui me disent qu'ils sont */
	      /* prets a recevoir */

	      for (m = 1; m < nprocs; m++)
		MPI_Recv (&c, 1, MPI_CHAR, m, 11, MPI_COMM_WORLD, &status);

	      /* sender side */
	      mc_addr_str = argv[1];	/* arg 1: multicast IP address */
	      mc_port = atoi (argv[2]);	/* arg 2: multicast port number */

	      /* validate the port range */
	      if ((mc_port < MIN_PORT) || (mc_port > MAX_PORT))
		{
		  fprintf (stderr, "Invalid port number argument %d.\n",
			   mc_port);
		  fprintf (stderr, "Valid range is between %d and %d.\n",
			   MIN_PORT, MAX_PORT);
		  exit (1);
		}

	      /* create a socket for sending to the multicast address */
	      if ((sock = socket (PF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0)
		{
		  perror ("socket() failed");
		  exit (1);
		}

	      /* set the TTL (time to live/hop count) for the send */
	      if ((setsockopt (sock, IPPROTO_IP, IP_MULTICAST_TTL,
			       (void *) &mc_ttl, sizeof (mc_ttl))) < 0)
		{
		  perror ("setsockopt() failed");
		  exit (1);
		}

	      /* construct a multicast address structure */
	      memset (&mc_addr, 0, sizeof (mc_addr));
	      mc_addr.sin_family = AF_INET;
	      mc_addr.sin_addr.s_addr = inet_addr (mc_addr_str);
	      mc_addr.sin_port = htons (mc_port);

	      /* send string to multicast address */

#ifndef NO_COMPRESSION
	      taille = destLen;
#else
	      taille = BLOCKSIZE * sizeof (Item);
#endif

	      //    printf("pid=%d, emission\n",mypid);
#ifndef NO_COMPRESSION
	      for (j=0; j< (int)(taille/MAX_MSG_SZ); j++){
		if ((sendto (sock, cloneTab+j*MAX_MSG_SZ , MAX_MSG_SZ, 0,
			     (struct sockaddr *) &mc_addr,
			     sizeof (mc_addr))) != MAX_MSG_SZ)
		  {

		    printf ("PID=%d, size=%d\n", mypid, taille);
		    perror ("sendto() sent incorrect number of bytes");
		    exit (1);
		  }
	      } //  (j=0; j< (int)(taille/MAX....
	      //the last msg
	      if ((taille % MAX_MSG_SZ) > 0)
		{
		  if ((sendto (sock, cloneTab + j*MAX_MSG_SZ , taille%MAX_MSG_SZ, 0,
			       (struct sockaddr *) &mc_addr,
			       sizeof (mc_addr))) != taille%MAX_MSG_SZ)
		    {
		      printf ("PID=%d, size=%d\n", mypid, taille);
		      perror ("sendto() sent incorrect number of bytes");
		      exit (1);
		    }
		}
#else 
	      for (j=0; j< (int)(taille/MAX_MSG_SZ); j++){
		//                memcpy(send_buffer,tab+((j*MAX_MSG_SZ)/sizeof(Item)),MAX_MSG_SZ);
		//		if ((sendto (sock, send_buffer, MAX_MSG_SZ, 0,
			     //			     (struct sockaddr *) &mc_addr,
			     //			     sizeof (mc_addr))) != MAX_MSG_SZ)
		  //		  {
		    //		    printf ("PID=%d, size=%d\n", mypid, taille);
		    //		    perror ("sendto() sent incorrect number of bytes");
		    //		    exit (1);
		    //		  }
                //memset(send_buffer, 0, MAX_MSG_SZ);

		if ((sendto (sock, tab+((j*MAX_MSG_SZ)/sizeof(Item)), MAX_MSG_SZ, 0,
			     (struct sockaddr *) &mc_addr,
			     sizeof (mc_addr))) != MAX_MSG_SZ)
		  {
		    printf ("PID=%d, size=%d\n", mypid, taille);
		    perror ("sendto() sent incorrect number of bytes");
		    exit (1);
		  }

	      } //  (j=0; j< (int)(taille/MAX....
	      //the last msg
	      if ((taille % MAX_MSG_SZ) > 0)
		{   
		  //                  memcpy(send_buffer,tab+((j*MAX_MSG_SZ)/sizeof(Item)),taille%MAX_MSG_SZ);
		  //		  if ((sendto (sock, send_buffer, taille%MAX_MSG_SZ, 0,
			       //			       (struct sockaddr *) &mc_addr,
			       //			       sizeof (mc_addr))) != (taille % MAX_MSG_SZ))
		    //		    {
		      //		      printf("TOTO\n");
		      //		      printf ("PID=%d, size=%d\n", mypid, taille);
		      //		      perror ("sendto() sent incorrect number of bytes");
		      //		      exit (1);
		      //  }
                  // memset(send_buffer, 0, MAX_MSG_SZ);

		  if ((sendto (sock, tab+((j*MAX_MSG_SZ)/sizeof(Item)), taille%MAX_MSG_SZ, 0,
			       (struct sockaddr *) &mc_addr,
			       sizeof (mc_addr))) != (taille % MAX_MSG_SZ))
		    {
		      printf ("PID=%d, size=%d\n", mypid, taille);
		      perror ("sendto() sent incorrect number of bytes");
		      exit (1);
		    }

		}
	      
#endif
	      // printf("pid=%d, fin emission\n",mypid);
	      close (sock);
	    }

	}
#else

#ifdef NO_COMPRESSION
      MPI_Bcast (tab, BLOCKSIZE * sizeof (Item), MPI_CHAR, 0, MPI_COMM_WORLD);
#else
      // La compression par compress2 est faire plus haut
      //printf("Taille donnee compressee: %d\n",destLen);
      MPI_Bcast (cloneTab, destLen, MPI_CHAR, 0, MPI_COMM_WORLD);
#endif

      MPI_Barrier (MPI_COMM_WORLD);

#ifndef NO_COMPRESSION
      if (mypid != 0)
	{
	  err = uncompress ((Bytef *) tab, &Len, (Bytef *) cloneTab, destLen);
	  CHECK_ERR (err, "uncompress");
	}
#endif

#endif


      MPI_Barrier (MPI_COMM_WORLD);

      //MPI_Barrier(MPI_COMM_WORLD);
      if (mypid == 0)
	{
	  gettimeofday (&chrono2, &tempsz);
	  micro = chrono2.tv_usec - chrono1.tv_usec;
	  if (micro < 0)
	    {
	      micro = 1000000 + micro;
	      second = chrono2.tv_sec - chrono1.tv_sec - 1;
	    }
	  else
	    second = chrono2.tv_sec - chrono1.tv_sec;
	  printf ("Temps Execution = %d sec et %d microsec\n", second, micro);
	  f += (float) (second + (float) (micro / 1000000.0));
	  //printf ("%f\n", f);
	}

    }
  if (mypid == 0)
    {
      printf ("------------------------------------------------------\n");
      printf ("Temps moyen d'envoi/reception : %f sec\n", f / NB_EXPERIENCE);
#ifndef NO_COMPRESSION
      if (COMPRESSION_LEVEL != 0)
	printf ("Taux Compression Moyen : %f\%\n",
		((((float) s) / ((float) BLOCKSIZE * sizeof (Item)) /
		  (float) NB_EXPERIENCE) * 100.0));
      else
	printf ("Taux Compression Moyen : 0\% (mais avec recopie)\n");
#else
      printf ("Taux Compression Moyen : ne s'applique pas !\n");
#endif
      printf ("------------------------------------------------------\n");

      /* 
       * Il reste a nettoyer les files TC
       */
#ifdef TC
      sprintf (CommandeTC, "%stc qdisc del dev eth0 root", PathTC);
      system (CommandeTC);
      sprintf (CommandeTC, "%stc qdisc del dev eth0 ingress", PathTC);
      system (CommandeTC);
      /* fin du nettoyage */
#endif
    }

  MPI_Finalize ();
  exit (0);
}
