Changeset 944
- Timestamp:
- 10/20/06 12:10:17 (7 years ago)
- Files:
-
- 1 modified
-
sandbox/trunk/pdp/src/Replication.cpp (modified) (6 diffs)
Legend:
- Unmodified
- Added
- Removed
-
sandbox/trunk/pdp/src/Replication.cpp
r943 r944 117 117 { 118 118 const int serverProcess = 0; 119 int rank = MPI::COMM_WORLD.Get_rank(); 120 int numProcess = MPI::COMM_WORLD.Get_size(); 119 120 MPI::Comm& comm = MPI::COMM_WORLD; 121 122 int rank = comm.Get_rank(); 123 int numProcess = comm.Get_size(); 121 124 122 125 if(numProcess < 2) 123 126 XERIAL_THROW(Exception, "numProcess must be more than 2"); 124 127 125 vector<uint8> msgBuffer(_bufferSize, 0); 126 uint8* buffer = &(msgBuffer[0]); 128 size_t maximumPackedDataSize = MPI::INT.Pack_size(1, comm) + MPI::BYTE.Pack_size(_bufferSize, comm); 129 XERIAL_DEBUG(_logger, "message size max = " << maximumPackedDataSize); 130 131 vector<uint8> fileBuffer(_bufferSize, 0); 132 vector<uint8> packedDataBuffer(maximumPackedDataSize, 0); 127 133 128 134 if(rank == serverProcess) … … 134 140 uint64 fileSize = FileUtil::fileSize(_fileName); 135 141 XERIAL_DEBUG(_logger, "file size = " << fileSize); 136 MPI::COMM_WORLD.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 137 142 comm.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 143 144 138 145 uint64 sendBytes = 0; 139 146 while(sendBytes < fileSize && !feof(fileHandle)) 140 147 { 141 int readBytes = static_cast<int>(fread( buffer, 1, _bufferSize, fileHandle));148 int readBytes = static_cast<int>(fread(&(fileBuffer[0]), 1, _bufferSize, fileHandle)); 142 149 if(readBytes < static_cast<int>(_bufferSize)) 143 150 { … … 145 152 XERIAL_THROW(Exception, format("actual read bytes = %1, specified read size = %2") % readBytes % int(_bufferSize)); 146 153 } 147 MPI::COMM_WORLD.Bcast(&readBytes, 1, MPI::INT, serverProcess); 148 MPI::COMM_WORLD.Bcast(buffer, readBytes, MPI::BYTE, serverProcess); 154 int sendBufferPosition = 0; 155 MPI::INT.Pack(&readBytes, 1, &(packedDataBuffer[0]), maximumPackedDataSize, sendBufferPosition, comm); 156 MPI::BYTE.Pack(&(fileBuffer[0]), readBytes, &(packedDataBuffer[0]), maximumPackedDataSize, sendBufferPosition, comm); 157 158 comm.Bcast(&(packedDataBuffer[0]), sendBufferPosition, MPI::PACKED, serverProcess); 149 159 sendBytes += readBytes; 150 160 XERIAL_DEBUG(_logger, "send bytes = " << sendBytes); … … 154 164 else 155 165 { 166 156 167 uint64 fileSize = 0; 157 MPI::COMM_WORLD.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess);168 comm.Bcast(&fileSize, 1, MPI::INTEGER8, serverProcess); 158 169 159 170 XERIAL_DEBUG(_logger, "incoming file size = " << fileSize); … … 165 176 XERIAL_THROW(Exception, "cannot open file " << outputFileName); 166 177 178 167 179 uint64 recievedBytes = 0; 168 180 while(recievedBytes < fileSize) 169 181 { 182 comm.Bcast(&(packedDataBuffer[0]), maximumPackedDataSize, MPI::PACKED, serverProcess); 170 183 int incomingDataSize = 0; 171 MPI::COMM_WORLD.Bcast(&incomingDataSize, 1, MPI::INT, serverProcess); 184 int packedBufferPosition = 0; 185 MPI::INT.Unpack(&(packedDataBuffer[0]), 1, &incomingDataSize, 1, packedBufferPosition, comm); 172 186 XERIAL_DEBUG(_logger, "incoming bytes = " << incomingDataSize); 173 MPI::COMM_WORLD.Bcast(buffer, incomingDataSize, MPI::BYTE, serverProcess); 174 175 size_t writesize = fwrite(buffer, 1, incomingDataSize, fileHandle); 187 //MPI::BYTE.Unpack(&(packedDataBuffer[0]), incomingDataSize, &(fileBuffer[0]), _bufferSize, packedBufferPosition, comm); 188 size_t writesize = fwrite(&(packedDataBuffer[packedBufferPosition]), 1, incomingDataSize, fileHandle); 176 189 if(writesize != incomingDataSize) 177 190 XERIAL_THROW(Exception, "write failed. write size = " << writesize); … … 182 195 fclose(fileHandle); 183 196 } 184 XERIAL_INFO(_logger, "done.");197 XERIAL_INFO(_logger, format("process %1%: done.") % rank); 185 198 } 186 199 void


