Changeset 944

Show
Ignore:
Timestamp:
10/20/06 12:10:17 (7 years ago)
Author:
leo
Message:

using packed message

Files:
1 modified

Legend:

Unmodified
Added
Removed
  • sandbox/trunk/pdp/src/Replication.cpp

    r943 r944  
    117117{ 
    118118        const int serverProcess = 0; 
    119         int rank = MPI::COMM_WORLD.Get_rank(); 
    120         int numProcess = MPI::COMM_WORLD.Get_size(); 
     119 
     120        MPI::Comm& comm = MPI::COMM_WORLD; 
     121 
     122        int rank = comm.Get_rank(); 
     123        int numProcess = comm.Get_size(); 
    121124 
    122125        if(numProcess < 2) 
    123126                XERIAL_THROW(Exception, "numProcess must be more than 2"); 
    124127 
    125         vector<uint8> msgBuffer(_bufferSize, 0); 
    126         uint8* buffer = &(msgBuffer[0]); 
     128        size_t maximumPackedDataSize = MPI::INT.Pack_size(1, comm) + MPI::BYTE.Pack_size(_bufferSize, comm); 
     129        XERIAL_DEBUG(_logger, "message size max = " << maximumPackedDataSize); 
     130 
     131        vector<uint8> fileBuffer(_bufferSize, 0); 
     132        vector<uint8> packedDataBuffer(maximumPackedDataSize, 0); 
    127133 
    128134    if(rank == serverProcess) 
     
    134140                uint64 fileSize = FileUtil::fileSize(_fileName); 
    135141                XERIAL_DEBUG(_logger, "file size = " << fileSize); 
    136                 MPI::COMM_WORLD.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 
    137  
     142                comm.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 
     143 
     144                 
    138145                uint64 sendBytes = 0; 
    139146                while(sendBytes < fileSize && !feof(fileHandle)) 
    140147                { 
    141                         int readBytes = static_cast<int>(fread(buffer, 1, _bufferSize, fileHandle)); 
     148                        int readBytes = static_cast<int>(fread(&(fileBuffer[0]), 1, _bufferSize, fileHandle)); 
    142149                        if(readBytes < static_cast<int>(_bufferSize)) 
    143150                        { 
     
    145152                                        XERIAL_THROW(Exception, format("actual read bytes = %1, specified read size = %2") % readBytes % int(_bufferSize)); 
    146153                        } 
    147                         MPI::COMM_WORLD.Bcast(&readBytes, 1, MPI::INT, serverProcess); 
    148                         MPI::COMM_WORLD.Bcast(buffer, readBytes, MPI::BYTE, serverProcess); 
     154                        int sendBufferPosition = 0; 
     155                        MPI::INT.Pack(&readBytes, 1, &(packedDataBuffer[0]), maximumPackedDataSize, sendBufferPosition, comm); 
     156                        MPI::BYTE.Pack(&(fileBuffer[0]), readBytes, &(packedDataBuffer[0]), maximumPackedDataSize, sendBufferPosition, comm); 
     157                         
     158                        comm.Bcast(&(packedDataBuffer[0]), sendBufferPosition, MPI::PACKED, serverProcess); 
    149159                        sendBytes += readBytes; 
    150160                        XERIAL_DEBUG(_logger, "send bytes = " << sendBytes); 
     
    154164        else 
    155165        { 
     166 
    156167                uint64 fileSize = 0; 
    157                 MPI::COMM_WORLD.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 
     168                comm.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 
    158169 
    159170                XERIAL_DEBUG(_logger, "incoming file size = " << fileSize); 
     
    165176                        XERIAL_THROW(Exception, "cannot open file " << outputFileName); 
    166177 
     178 
    167179                uint64 recievedBytes = 0; 
    168180                while(recievedBytes < fileSize) 
    169181                { 
     182                        comm.Bcast(&(packedDataBuffer[0]), maximumPackedDataSize, MPI::PACKED, serverProcess); 
    170183                        int incomingDataSize = 0; 
    171                         MPI::COMM_WORLD.Bcast(&incomingDataSize, 1, MPI::INT, serverProcess); 
     184                        int packedBufferPosition = 0; 
     185                        MPI::INT.Unpack(&(packedDataBuffer[0]), 1, &incomingDataSize, 1, packedBufferPosition, comm); 
    172186                        XERIAL_DEBUG(_logger, "incoming bytes = " << incomingDataSize); 
    173                         MPI::COMM_WORLD.Bcast(buffer, incomingDataSize, MPI::BYTE, serverProcess); 
    174  
    175                         size_t writesize = fwrite(buffer, 1, incomingDataSize, fileHandle); 
     187                        //MPI::BYTE.Unpack(&(packedDataBuffer[0]), incomingDataSize, &(fileBuffer[0]), _bufferSize, packedBufferPosition, comm); 
     188                        size_t writesize = fwrite(&(packedDataBuffer[packedBufferPosition]), 1, incomingDataSize, fileHandle); 
    176189                        if(writesize != incomingDataSize) 
    177190                                XERIAL_THROW(Exception, "write failed. write size =  " << writesize); 
     
    182195                fclose(fileHandle); 
    183196        } 
    184         XERIAL_INFO(_logger, "done."); 
     197        XERIAL_INFO(_logger, format("process %1%: done.") % rank); 
    185198} 
    186199void