|
|||||||||||
|
bk commit into 6.0-falcon tree (hall:1.2609)
From: Christoffer Hall <hall(at)mysql.com>
Date: Fri Oct 05 2007 - 05:07:30 EDT
ChangeSet@1.2609, 2007-10-05 11:07:22+02:00, hall@helheim.(none) +15 -0 Implementation of Aio. storage/falcon/Aio.cpp@1.2, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +485 -0 Aio implementation. storage/falcon/Aio.cpp@1.1, 2007-09-24 14:21:22+02:00, hall@helheim.(none) +3 -0 BitKeeper file /home/hall/Falcon/mysql-5.1-falcon-naio/storage/falcon/Aio.cpp storage/falcon/Aio.cpp@1.0, 2007-09-24 14:21:22+02:00, hall@helheim.(none) +0 -0 storage/falcon/Aio.h@1.2, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +82 -6 Aio implementation. storage/falcon/Aio.h@1.1, 2007-09-24 14:20:30+02:00, hall@helheim.(none) +48 -0 BitKeeper file /home/hall/Falcon/mysql-5.1-falcon-naio/storage/falcon/Aio.h storage/falcon/Aio.h@1.0, 2007-09-24 14:20:30+02:00, hall@helheim.(none) +0 -0 storage/falcon/Connection.cpp@1.23, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +3 -2 Switch to Aio. storage/falcon/DatabaseClone.cpp@1.6, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +1 -1 Switch to Aio. storage/falcon/DatabaseClone.h@1.3, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -2 Switch to Aio. storage/falcon/Dbb.cpp@1.86, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -2 Switch to Aio. storage/falcon/Dbb.h@1.49, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +3 -2 Switch to Aio. storage/falcon/IO.cpp@1.23, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -0 Switch to Aio. storage/falcon/Makefile.am@1.69, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -2 Switch to Aio. storage/falcon/RepositoryVolume.cpp@1.23, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +1 -1 Switch to Aio. storage/falcon/SerialLog.cpp@1.114, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +1 -1 Switch to Aio. storage/falcon/SerialLog.h@1.53, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -2 Switch to Aio. storage/falcon/StorageDatabase.cpp@1.87, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +1 -1 Switch to Aio. storage/falcon/StorageHandler.cpp@1.54, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -2 Switch to Aio. storage/falcon/Transaction.cpp@1.131, 2007-10-05 11:07:17+02:00, hall@helheim.(none) +2 -1 Switch to Aio. diff -Nrup a/storage/falcon/Aio.cpp b/storage/falcon/Aio.cpp --- /dev/null Wed Dec 31 16:00:00 196900@@ -0,0 +1,488 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +// Aio.cpp: implementation of the Aio class. +// +////////////////////////////////////////////////////////////////////// + + +#ifdef _WIN32 +#include <windows.h> +#include <io.h> +#include <direct.h> +#define LSEEK _lseeki64 +#define SEEK_OFFSET int64 +#define MKDIR(dir) mkdir (dir) + +#else + +#include "config.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <fcntl.h> +#include <errno.h> +#include <string.h> +#include <limits.h> +#include <stdlib.h> +#include <signal.h> + + +#define O_RANDOM 0 +#define O_BINARY 0 + +#ifndef LSEEK +#define LSEEK lseek +#define SEEK_OFFSET off_t +#endif + +#define MKDIR(dir) mkdir (dir, S_IRUSR | S_IRGRP | S_IROTH | S_IWUSR | S_IWGRP | S_IXUSR | S_IXGRP) + +#endif + +#ifndef PATH_MAX +#define PATH_MAX 256 +#endif + +#include "Engine.h" +#include "Log.h" +#include "DateTime.h" +#include "Threads.h" +#include "SQLException.h" +#include "SQLError.h" +#include "Error.h" +#include "BDB.h" +#include "Hdr.h" +#include "Sync.h" +#include "Aio.h" +#include "Dbb.h" + +#ifdef _DEBUG +#undef THIS_FILE +static const char THIS_FILE[]=__FILE__; +#endif + + +const int AioThreadsMax = 32; +const int AioMaxPending = 512; + +Aio::Aio() +{ + aioWorkers = new Threads(NULL, AioThreadsMax); + rqs = new AioRQ[AioMaxPending]; + rqsPending = 0; + + fatalError = false; + dbb = NULL; + reads = writes = 0; + fetches = fakes = 0; + priorReads = priorWrites = priorFetches = priorFakes = 0; +} + +Aio::~Aio() +{ + aioWorkers->shutdownAll(); + aioWorkers->waitForAll(); + + //delete aioWorkers; + delete rqs; +} + +void Aio::signalAioRQ(AioRQ *rq) +{ + // singal BDB +} + +void Aio::freeAioRQ(AioRQ *rq) +{ + Sync sync(&syncObject, "Aio::submitBdb"); + sync.lock(Exclusive); + + rq->isFree = true; + rqsPending--; + +} + +int Aio::allocAioRQ() +{ + Sync sync(&syncObject, "Aio::allocAioRQ"); + + for (;;) + { + + sync.lock(Exclusive); + + for (int i = 0; i < AioMaxPending; i++) + if (rqs[i].isFree) + { + rqs[i].isFree = false; + rqsPending++; + return i; + } + + sync.unlock(); + allocSync.sleep(); + } +} + +void Aio::submitBdb(Bdb* bdb, bool async, bool isWrite) +{ + + + int i = allocAioRQ(); + AioRQ *rq = &rqs[i]; + + rq->offset = (int64) bdb->pageNumber * pageSize; + rq->fd = fd; + rq->size = pageSize; + rq->buffer = bdb->buffer; + rq->aio = this; + rq->bdb = bdb; + rq->isWrite = isWrite; + + if (async) + { + aioWorkers->startWhenever("Aio worker thread", Aio::aioWorkerThread, rq); + return; + } + + + if (isWrite) + pwrite(rq); + else + pread(rq); + + signalAioRQ(rq); + freeAioRQ(rq); +} + + +void Aio::write(int64 offset, int32 size, UCHAR *buffer) +{ + int i = allocAioRQ(); + AioRQ *rq = &rqs[i]; + + rq->offset = offset; + rq->fd = fd; + rq->size = size; + rq->buffer = buffer; + rq->aio = this; + rq->bdb = NULL; + rq->isWrite = true; + + pwrite(rq); + +} + +void Aio::read(int64 offset, int32 size, UCHAR *buffer) +{ + int i = allocAioRQ(); + AioRQ *rq = &rqs[i]; + + rq->offset = offset; + rq->fd = fd; + rq->size = size; + rq->buffer = buffer; + rq->aio = this; + rq->bdb = NULL; + rq->isWrite = false; + + pread(rq); + +} + +void Aio::aioWorkerThread(void *argument) +{ + + AioRQ *rq = (AioRQ *) argument; + Aio *aio = rq->aio; + + Sync sync(&aio->syncObject, "Aio::aioWorkerThread"); + if (rq->isWrite) + aio->pwrite(rq); + else + aio->pread(rq); + + aio->signalAioRQ(rq); + aio->freeAioRQ(rq); +} + +void Aio::pread(AioRQ *rq) +{ + int ret; + +#if defined(HAVE_PREAD) && !defined(HAVE_BROKEN_PREAD) + + ret = ::pread (rq->fd, rq->buffer, rq->size, rq->offset); + +#else + Sync sync (&syncObject, "IO::pread"); + sync.lock (Exclusive); + + int64 off = LSEEK(rq->fd, rq->offset, SEEK_SET); + ASSERT(off == rq->offset); + + ret = (int) read(rq->fd, rq->buffer, rq->size); + + ASSERT(ret == rq->size); + +#endif + + reads++; +} + +void Aio::pwrite(AioRQ *rq) +{ + int ret = ::pwrite (rq->fd, rq->buffer, rq->size, rq->offset); + ASSERT(ret == rq->size); + writes++; +} + +void Aio::sync() +{ + time_t before = DateTime::getNow(); + fsync(fd); + time_t delta = DateTime::getNow() - before; + + Log::debug("Fsync took %d seconds on %d\n", delta, fd); +} + +bool Aio::openFile(const char * name, bool readOnly) +{ + fileName = name; + fd = ::open (fileName, (readOnly) ? O_RDONLY | O_BINARY : O_RDWR | O_BINARY); + + if (fd < 0) + throw SQLEXCEPTION (CONNECTION_ERROR, "can't open file \"%s\": %s (%d)", + name, strerror (errno), errno); + +#ifndef _WIN32 + signal (SIGXFSZ, SIG_IGN); + +#ifndef STORAGE_ENGINE + if (flock (fd, (readOnly) ? LOCK_SH : LOCK_EX)) + { + ::close (fd); + throw SQLEXCEPTION (CONNECTION_ERROR, "file \"%s\" in use by another process", name); + } +#endif +#endif + + //Log::debug("IO::openFile %s (%d) fd: %d\n", (const char*) fileName, readOnly, fileId); + + return fd != -1; +} + +bool Aio::createFile(const char *name, uint64 initialAllocation) +{ + Log::debug("Aio::createFile: creating file \"%s\"\n", name); + + fileName = name; + fd = ::open (fileName, + O_CREAT | O_RDWR | O_RANDOM | O_TRUNC | O_BINARY, + S_IREAD | S_IWRITE | S_IRGRP | S_IWGRP); + + if (fd < 0) + throw SQLEXCEPTION (CONNECTION_ERROR, "can't create file \"%s\", %s (%d)", + name, strerror (errno), errno); + +#ifndef _WIN32 +#ifndef STORAGE_ENGINE + flock(fileId, LOCK_EX); +#endif +#endif + + if (initialAllocation) + { + UCHAR *raw = new UCHAR[8192 * 257]; + UCHAR *aligned = (UCHAR*) (((UIPTR) raw + 8191) / 8192 * 8192); + uint size = 8192 * 256; + memset(aligned, 0, size); + uint64 offset = 0; + + for (uint64 remaining = initialAllocation; remaining;) + { + uint n = (int) MIN(remaining, size); + write(offset, n, aligned); + offset += n; + remaining -= n; + } + + delete [] raw; + sync(); + } + + return fd != -1; +} + +bool Aio::isOpen() +{ + return fd != -1; +} + +void Aio::seek(int pageNumber) +{ + SEEK_OFFSET pos = (int64) pageNumber * pageSize; + SEEK_OFFSET result = LSEEK (fd, pos, SEEK_SET); + + if (result != pos) + Error::error ("long seek failed on page %d of \"%s\"", + pageNumber, (const char*) fileName); +} + +void Aio::createPath(const char *fileName) +{ + // First, better make sure directories exists + + char directory [256], *q = directory; + + for (const char *p = fileName; *p;) + { + char c = *p++; + + if (c == '/' || c == '\\') + { + *q = 0; + + if (q > directory && q [-1] != ':') + if (MKDIR (directory) && errno != EEXIST) + throw SQLError (IO_ERROR, "can't create directory \"%s\"\n", directory); + } + *q++ = c; + } +} + +void Aio::expandFileName(const char *fileName, int length, char *buffer) +{ +#ifdef _WIN32 + char expandedName [PATH_MAX], *baseName; + GetFullPathName (fileName, sizeof (expandedName), expandedName, &baseName); + const char *path = expandedName; +#else + char expandedName [PATH_MAX]; + expandedName [0] = 0; + const char *path = realpath (fileName, expandedName); + + if (!path) + if (errno == ENOENT && expandedName [0]) + path = expandedName; + else + path = fileName; +#endif + if ((int) strlen (path) >= length) + throw SQLError (IO_ERROR, "expanded filename exceeds buffer length\n"); + + strcpy (buffer, path); +} + +bool Aio::doesFileExits(const char *fileName) +{ + struct stat stats; + + return stat(fileName, &stats) == 0; +} + +void Aio::declareFatalError() +{ + fatalError = true; +} + +void Aio::closeFile() +{ + if (fd != -1) + { + ::close (fd); + //Log::debug("IO::closeFile %s fd: %d\n", (const char*) fileName, fileId); + fd = -1; + } +} + +void Aio::deleteFile(const char* fileName) +{ + unlink(fileName); +} + +void Aio::deleteFile() +{ + deleteFile(fileName); +} + +// sync wrappers +void Aio::readPage(Bdb * bdb) +{ + + if (fatalError) + FATAL ("can't continue after fatal error"); + + submitBdb(bdb, false, false); +} + +void Aio::readHeader(Hdr * header) +{ + int n = lseek (fd, 0, SEEK_SET); + n = ::read (fd, header, sizeof (Hdr)); + + if (n != sizeof (Hdr)) + FATAL ("read error on database header"); +} + +void Aio::writePage(Bdb * bdb) +{ + + if (fatalError) + FATAL ("can't continue after fatal error"); + + submitBdb(bdb, false, true); +} + +void Aio::write(uint32 length, const UCHAR *data) +{ + uint32 len = ::write (fd, data, length); + + if (len != length) + throw SQLError(IO_ERROR, "bad write length, %d of %d requested", len, length); +} + +int Aio::read(int length, UCHAR *buffer) +{ + return ::read(fd, buffer, length); +} + +void Aio::writeHeader(Hdr *header) +{ + int n = lseek (fd, 0, SEEK_SET); + n = ::write (fd, header, sizeof (Hdr)); + + if (n != sizeof (Hdr)) + FATAL ("write error on database clone header"); +} + +bool Aio::trialRead(Bdb *bdb) +{ + Sync sync (&syncObject, "Aio::trialRead"); + sync.lock (Exclusive); + + int64 offset = bdb->pageNumber * pageSize; + LSEEK(fd, offset, SEEK_SET); + int length = ::read (fd, bdb->buffer, pageSize); + + if (length != pageSize) + return false; + + ++reads; + + return true; +} diff -Nrup a/storage/falcon/Aio.h b/storage/falcon/Aio.h --- /dev/null Wed Dec 31 16:00:00 196900@@ -0,0 +1,124 @@ +/* Copyright (C) 2006 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; version 2 of the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +// Aio.h: interface for the IO class. +// +////////////////////////////////////////////////////////////////////// + +#if !defined(AFX_AIO_H__) +#define AFX_AIO_H__ + +#include "JString.h" +#include "SyncObject.h" + +#include "Synchronize.h" + +class Bdb; +class Hdr; +class Dbb; +class Threads; +class Aio; + + +class AioRQ +{ +public: + int fd; + int64 offset; + int32 size; + void *buffer; + bool isWrite; + bool isFree; + Aio *aio; + + Bdb *bdb; +}; + + +class Aio +{ +public: + Aio(); + ~Aio(); + + void submitRead(Bdb *buffer, bool async); + void submitWrite(Bdb *buffer, bool async); + + static void aioWorkerThread(void *argument); + + void sync(); + + bool openFile(const char * name, bool readOnly); + bool createFile(const char *name, uint64 initialAllocation); + static void createPath(const char *fileName); + static bool doesFileExits(const char *fileName); + static void expandFileName(const char *fileName, int length, char *buffer); + static void deleteFile(const char* fileName); + void deleteFile(); + void closeFile(); + + void write(int64 offset, int32 size, UCHAR *buffer); + void read(int64 offset, int32 size, UCHAR *buffer); + + void readPage(Bdb * bdb); + void writePage(Bdb * bdb); + void readHeader(Hdr * header); + void writeHeader(Hdr *header); + bool trialRead(Bdb *bdb); + + void declareFatalError(); + + bool isOpen(); + void seek(int pageNumber); + void write(uint32 length, const UCHAR *data); + int read(int length, UCHAR *buffer); + + JString fileName; + int pageSize; + int32 reads; + int32 writes; + int32 fetches; + int32 fakes; + int32 priorReads; + int32 priorWrites; + int32 priorFetches; + int32 priorFakes; + Dbb *dbb; + +private: + + void submitBdb(Bdb *bdb, bool async, bool isWrite); + + int allocAioRQ(); + void signalAioRQ(AioRQ *rq); + void freeAioRQ(AioRQ *rq); + + void pread(AioRQ *rq); + void pwrite(AioRQ *rq); + + Threads *aioWorkers; + AioRQ *rqs; + int rqsPending; + int fd; + + bool fatalError; + + SyncObject syncObject; + Synchronize allocSync; +}; + +#endif + + diff -Nrup a/storage/falcon/Connection.cpp b/storage/falcon/Connection.cpp --- a/storage/falcon/Connection.cpp 2007-08-25 01:14:02 +02:00 #include "Server.h" -#include "IOx.h" +//#include "IOx.h" +#include "Aio.h"
#ifndef STORAGE_ENGINE
// we should ignore the registry. if (filename) - IO::expandFileName(filename, sizeof(dbFileName), dbFileName);@@ -47,7 +47,7 @@ void DatabaseClone::close(void) void DatabaseClone::createFile(const char* fileName) { - shadow = new IO;@@ -18,7 +18,7 @@ #include "DatabaseCopy.h"
-class IO;
@@ -36,7 +36,7 @@ public: void writeHeader(Hdr* header); void writePage(Bdb* bdb); - IO *shadow; }; diff -Nrup a/storage/falcon/Dbb.cpp b/storage/falcon/Dbb.cpp --- a/storage/falcon/Dbb.cpp 2007-08-28 00:58:30 +02:00 @@ -1096,7 +1096,7 @@ void Dbb::printPage(Bdb* bdb)
void Dbb::close()
- if (fileId != -1)@@ -24,7 +24,8 @@ #pragma once #endif // _MSC_VER >= 1000
-#include "IOx.h"
#include "HdrState.h" #include "PageType.h" #include "SyncObject.h" @@ -101,7 +102,7 @@ class DeferredIndex; class DatabaseCopy; class DatabaseClone;
-class Dbb : public IO
void updateTableSpaceSection (int id); diff -Nrup a/storage/falcon/IO.cpp b/storage/falcon/IO.cpp --- a/storage/falcon/IO.cpp 2007-08-22 18:41:34 +02:00 } + + diff -Nrup a/storage/falcon/Makefile.am b/storage/falcon/Makefile.am --- a/storage/falcon/Makefile.am 2007-09-16 19:42:34 +02:00 -falcon_sources= Agent.cpp Alias.cpp \ +falcon_sources= Agent.cpp Aio.cpp Alias.cpp \ AsciiBlob.cpp \ BDB.cpp \ BigInt.cpp \ diff -Nrup a/storage/falcon/RepositoryVolume.cpp b/storage/falcon/RepositoryVolume.cpp --- a/storage/falcon/RepositoryVolume.cpp 2007-08-30 19:41:50 +02:00@@ -209,7 +209,7 @@ void RepositoryVolume::makeWritable()
void RepositoryVolume::create()
- IO::createPath (fileName);}
-uint32 SerialLog::appendLog(IO *shadow, int lastPage)
Sync sync(&syncWrite, "SerialLog::appendLog"); sync.lock(Exclusive); diff -Nrup a/storage/falcon/SerialLog.h b/storage/falcon/SerialLog.h --- a/storage/falcon/SerialLog.h 2007-08-31 18:23:29 +02:00@@ -67,7 +67,7 @@ class SerialLogControl; class SerialLogWindow; class SerialLogTransaction; class Bitmap; -class IO; +class Aio; class RecoveryObjects; class Sync; class Transaction; @@ -87,7 +87,7 @@ public: UCHAR* allocBuffer(); void shutdownNow(); void dropDatabase(); - uint32 appendLog (IO *shadow, int lastPage);@@ -32,7 +32,8 @@ #include "Table.h" #include "Interlock.h" #include "SavePoint.h" -#include "IOx.h" +//#include "IOx.h" +#include "Aio.h" #include "DeferredIndex.h" #include "TransactionManager.h" #include "SerialLog.h" -- MySQL Code Commits Mailing List For list archives: http://lists.mysql.com/commits To unsubscribe: http://lists.mysql.com/commits?unsub=lists@pantek.comReceived on Wed Nov 28 04:21:36 2007 This archive was generated by hypermail 2.1.8 : Thu Jul 03 2008 - 13:58:04 EDT |
||||||||||
|
|||||||||||