Data Center¶
The Kofa data center cares for managing CSV files and importing then.
Creating a data center¶
A data center can be created easily:
>>> from waeup.kofa.datacenter import DataCenter
>>> mydatacenter = DataCenter()
>>> mydatacenter
<waeup.kofa.datacenter.DataCenter object at 0x...>
Each data center has a location in file system where files are stored:
>>> storagepath = mydatacenter.storage
>>> storagepath
'/tmp/tmp...'
Beside other things it provides two locations to put data of deleted items into:
>>> import os
>>> del_path = mydatacenter.deleted_path
>>> os.path.isdir(del_path)
True
>>> grad_path = mydatacenter.graduated_path
>>> os.path.isdir(grad_path)
True
Overall it complies with the IDataCenter interface:
>>> from zope.interface import verify
>>> from waeup.kofa.interfaces import IDataCenter
>>> verify.verifyObject(IDataCenter, DataCenter() )
True
>>> verify.verifyClass(IDataCenter, DataCenter)
True
Managing the storage path¶
We can set another storage path:
>>> import os
>>> os.mkdir('newlocation')
>>> newpath = os.path.abspath('newlocation')
>>> mydatacenter.setStoragePath(newpath)
[]
The result here is a list of filenames, that could not be copied. Luckily, this list is empty.
When we set a new storage path, we can tell to move all files in the old location to the new one. To see this feature in action, we first have to put a file into the old location:
>>> open(os.path.join(newpath, 'myfile.txt'), 'wb').write('hello')
Now we can set a new location and the file will be copied:
>>> verynewpath = os.path.abspath('verynewlocation')
>>> os.mkdir(verynewpath)
>>> mydatacenter.setStoragePath(verynewpath, move=True)
[]
>>> storagepath = mydatacenter.storage
>>> 'myfile.txt' in os.listdir(verynewpath)
True
We remove the created file to have a clean testing environment for upcoming examples:
>>> os.unlink(os.path.join(storagepath, 'myfile.txt'))
Uploading files¶
We can get a list of files stored in that location:
>>> mydatacenter.getPendingFiles()
[]
Let’s put some file in the storage:
>>> import os
>>> filepath = os.path.join(storagepath, 'data.csv')
>>> open(filepath, 'wb').write('Some Content\n')
Now we can find a file:
>>> mydatacenter.getPendingFiles()
[<waeup.kofa.datacenter.DataCenterFile object at 0x...>]
As we can see, the actual file is wrapped by a convenience wrapper, that enables us to fetch some data about the file. The data returned is formatted in strings, so that it can easily be put into output pages:
>>> datafile = mydatacenter.getPendingFiles()[0]
>>> datafile.getSize()
'13 bytes'
>>> datafile.getDate() # Nearly current datetime...
'...'
Clean up:
>>> import shutil
>>> shutil.rmtree(newpath)
>>> shutil.rmtree(verynewpath)
Distributing processed files¶
When files were processed by a batch processor, we can put the resulting files into desired destinations.
We recreate the datacenter root in case it is missing:
>>> import os
>>> dc_root = mydatacenter.storage
>>> fin_dir = os.path.join(dc_root, 'finished')
>>> unfin_dir = os.path.join(dc_root, 'unfinished')
>>> def recreate_dc_storage():
... if os.path.exists(dc_root):
... shutil.rmtree(dc_root)
... os.mkdir(dc_root)
... mydatacenter.setStoragePath(mydatacenter.storage)
>>> recreate_dc_storage()
We define a function that creates a set of faked result files:
>>> import os
>>> import tempfile
>>> def create_fake_results(source_basename, create_pending=True):
... tmp_dir = tempfile.mkdtemp()
... src = os.path.join(dc_root, source_basename)
... pending_src = None
... if create_pending:
... pending_src = os.path.join(tmp_dir, 'mypendingsource.csv')
... finished_src = os.path.join(tmp_dir, 'myfinishedsource.csv')
... for path in (src, pending_src, finished_src):
... if path is not None:
... open(path, 'wb').write('blah')
... return tmp_dir, src, finished_src, pending_src
Now we can create the set of result files, that typically come after a successful processing of a regular source:
Now we can try to distribute those files. Let’s start with a source file, that was processed successfully:
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
... pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]
The created dir will be removed for us by the datacenter. This way we can assured, that less temporary dirs are left hanging around:
>>> os.path.exists(tmp_dir)
False
The root dir is empty, while the original file and the file containing all processed data were moved to’finished/’.
Now we restart, but this time we fake an erranous action:
>>> recreate_dc_storage()
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
... pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'mysource.create.pending.csv', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv']
>>> sorted(os.listdir(unfin_dir))
['mysource.csv']
While the original source was moved to the ‘unfinished’ dir, the pending file went to the root and the set of already processed items are stored in finished/.
We fake processing the pending file and assume that everything went well this time:
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.create.pending.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
... pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]
The result is the same as in the first case shown above.
We restart again, but this time we fake several non-working imports in a row.
We start with a faulty start-import:
>>> recreate_dc_storage()
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
... pending_src, mode='create')
We try to process the pending file, which fails again:
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.create.pending.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
... pending_src, mode='create')
We try to process the new pending file:
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.create.pending.csv')
>>> mydatacenter.distProcessedFiles(False, src, finished_src,
... pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'mysource.create.pending.csv', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv']
>>> sorted(os.listdir(unfin_dir))
['mysource.csv']
Finally, we process the pending file and everything works:
>>> tmp_dir, src, finished_src, pending_src = create_fake_results(
... 'mysource.create.pending.csv', create_pending=False)
>>> mydatacenter.distProcessedFiles(True, src, finished_src,
... pending_src, mode='create')
>>> sorted(os.listdir(dc_root))
['deleted', 'finished', 'graduated', 'logs', 'unfinished']
>>> sorted(os.listdir(fin_dir))
['mysource.create.finished.csv', 'mysource.csv']
>>> sorted(os.listdir(unfin_dir))
[]
The root dir is empty (contains no input files) and only the files in finished-subdirectory remain.
We can get a list of imported files stored in the finished subfolder:
>>> mydatacenter.getFinishedFiles()
[<waeup.kofa.datacenter.DataCenterFile object at ...>]
>>> datafile = mydatacenter.getFinishedFiles()[0]
>>> datafile.getSize()
'2 bytes'
>>> datafile.getDate() # Nearly current datetime...
'...'
Clean up:
>>> shutil.rmtree(verynewpath)