Source code for evnrg.storage.datastorage

import os
import errno
import uuid
import re
import pathlib
from typing import NamedTuple

from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import (
    Provider,
    ContainerDoesNotExistError,
    ObjectDoesNotExistError
)
from libcloud.storage.providers import get_driver, DRIVERS
import appdirs
import pandas as pd

TMP_DIR = appdirs.user_cache_dir('evnrg')
CONFIG_DIR = appdirs.user_config_dir('evnrg')
LOCAL_DATA = appdirs.user_data_dir('evnrg')
LOCAL_STORAGE = os.path.join(LOCAL_DATA, 'local-object-storage')


# These are all aliases for other strings
PROVIDER_ALIASES = {
    # Amazon S3
    's3': Provider.S3,
    'amazon': Provider.S3,

    # Google Cloud Storage
    'gcs': Provider.GOOGLE_STORAGE,
    'google': Provider.GOOGLE_STORAGE,

    # Backblaze B2
    'b2': Provider.BACKBLAZE_B2,
    'backblaze': Provider.BACKBLAZE_B2,

    # Digital Ocean Spaces
    'digitalocean': Provider.DIGITALOCEAN_SPACES,
    'do': Provider.DIGITALOCEAN_SPACES,
    'spaces': Provider.DIGITALOCEAN_SPACES,

    # Microsoft Azure
    'microsoft': Provider.AZURE_BLOBS,
    'azure': Provider.AZURE_BLOBS,
}


[docs]class StorageInfo(NamedTuple): """A `NamedTuple` that tells the simulation how to handle data. Attributes: key (str): ID or login key for the cloud account. For example, Google Cloud Services uses an IAM email address, and can be found in the 'client_email' key of a generated JSON file. (See [Google Cloud's IAM documentation](https://cloud.google.com/iam/docs/) for more.) For local storage, the key is the base path that will be used while emulating cloud storage. Defaults to `~/.local/share/evnrg/local-object-storage`. secret (str): Secret key or passphrase for the cloud account. For Goggle Cloud, it will be in the 'private_key' field in the JSON file. Defaults to empty string (''). bucket (str): The name of the container (or bucket) to look for data and store it in. Defaults to 'evnrg-default'. For local storage, this folder will exist under the key: `<key>/<bucket>`. provider (str): The provider to use. Takes any provider string object that [Apache LibCloud](https://libcloud.readthedocs.io/en/latest/supported_providers.html#id195) takes. Additional provider shotcuts are provided as well: * Google Cloud Storage: 'gcs', 'google' * Amazon S3: 's3', 'amazon' * Backblaze B2: 'b2', 'backblaze' * Digital Ocean Spaces: 'do', 'spaces', 'digitalocean' * Microsoft Azure: 'azure', 'microsoft' Defaults to 'local'. cahce_dir (str): The local cahce directory for use when dumping dataframes for upload, and downloading dataframes for use. Defaults to `~/.cache/evnrg`. create_bucket (bool): Specifies if a bucket should be created if it does not exist. Defaults to `True`. """ key: str = LOCAL_STORAGE secret: str = '' bucket: str = 'evnrg-default' provider: str = 'local' cache_dir: str = TMP_DIR create_bucket: bool = True
[docs]class UploadResult(NamedTuple): """Contains data about the uploaded object. Attributes: uid (str): The object's uuid. filetype (str): The object's filetype. Will generally be one of `'parquet'`, `'json'`, `'records'`, or `'csv'`. cache_path (str): The path to the temporary file. remote_path (str): The object's remote object name. obj (libcloud.storage.base.Object): The LibCloud storage object. """ uid: str filetype: str cache_path: str remote_path: str obj: Object
[docs]class DataHandler(object): __slots__ = ( 'driver', 'container', 'cache_dir', 'temp' ) def __init__(self, storage_info: StorageInfo): if bool(storage_info.provider): provider_id = PROVIDER_ALIASES.get( storage_info.provider, storage_info.provider ) if provider_id not in DRIVERS.keys(): raise IndexError('Invalid object strage provider string.') self.driver = get_driver(provider_id)( storage_info.key, storage_info.secret ) self.cache_dir = storage_info.cache_dir self.temp = [] # Create the path if it doesn't exist pathlib.Path(self.cache_dir).mkdir(parents=True, exist_ok=True) try: self.container = self.driver.get_container( container_name=storage_info.bucket ) except ContainerDoesNotExistError: if storage_info.create_container: self.container = self.driver.create_container( container_name=storage_info.bucket ) else: self.container = None @property def ready(self): return self.container is not None
[docs] def use_bucket(self, bucket: str, create: False): try: self.container = self.driver.get_container(container_name=bucket) return True except ContainerDoesNotExistError: if create: try: self.container = self.driver.create_container( container_name=bucket ) return True except Exception: self.container = None else: self.container = None return False
[docs] def cleanup(self): for fn in self.temp: if os.path.isfile(fn): os.remove(fn)
[docs] def upload_data(self, df: pd.DataFrame, obj_path: str, formats: str = 'parquet', keep_temp: bool = False, enable_cleanup: bool = True): uid = uuid.uuid4().hex fmts = re.findall(r'[\w]+', formats) local_path = os.path.join(self.cache_dir, uid) remote_base = obj_path.rstrip('/') + '/' results = [] written = [] if 'parquet' in fmts: local_parq = local_path + '.parquet' remote_parq = remote_base + 'parquet/' + uid + '.parquet' df.to_parquet( local_parq, engine='fastparquet', compression='gzip' ) written.append(local_parq) o = self.driver.upload_object( local_parq, self.container, remote_parq ) if o: results.append( UploadResult( uid=uid, filetype='parquet', cache_path=local_parq, remote_path=remote_parq, obj=o ) ) if 'json' in fmts: local_json = local_path + '.json' remote_json = remote_base + 'json/' + uid + '.json' df.to_json( local_json, orient='split', date_format='iso', date_unit='s' ) written.append(local_json) o = self.driver.upload_object( local_json, self.container, remote_json ) if o: results.append( UploadResult( uid=uid, filetype='json', cache_path=local_json, remote_path=remote_json, obj=o ) ) if 'records' in fmts: local_records = local_path + '.records.json' remote_records = remote_base + 'records/' + uid + '.records.json' df.to_json( local_records, orient='records', date_format='iso', date_unit='s' ) written.append(local_records) o = self.driver.upload_object( local_records, self.container, remote_records ) if o: results.append( UploadResult( uid=uid, filetype='records', cache_path=local_records, remote_path=remote_records, obj=o ) ) if 'csv' in fmts: local_csv = local_path + '.records.json' remote_csv = remote_base + 'records/' + uid + '.records.json' df.to_csv( local_csv, date_format='%Y-%m-%dT%H:%M:%S' ) written.append(local_csv) o = self.driver.upload_object( local_csv, self.container, remote_csv ) if o: results.append( UploadResult( uid=uid, filetype='csv', cache_path=local_csv, remote_path=remote_csv, obj=o ) ) # Delete temporary files if not keep_temp: for to_remove in written: if os.path.isfile(to_remove): os.remove(to_remove) # Add to file list for cleanup if enable_cleanup: for to_remove in written: if os.path.isfile(to_remove): self.temp.append(to_remove) return results
[docs] def read_data(self, obj_name: str, fmt: str = 'parquet', read_fn=None, arguments: dict = {}): o = None try: o = self.driver.get_object(self.container.name, obj_name) except ObjectDoesNotExistError: return None if o is None: return None fname = obj_name.split('/')[-1] tmp_name = os.path.join(TMP_DIR, fname) if self.driver.download_object(o, tmp_name, True, True): self.temp.append(tmp_name) if fmt == 'parquet': # Default to fastparquet d_ = {'engine': 'fastparquet'} d_.update(arguments) return pd.read_parquet(tmp_name, **d_) elif fmt == 'csv': return pd.read_csv(tmp_name, **arguments) elif fmt == 'json': d_ = {'orient': 'split'} d_.update(arguments) return pd.read_json(tmp_name, **d_) elif fmt == 'json-records': d_ = {'orient': 'records'} d_.update(arguments) return pd.read_json(tmp_name, **d_) elif callable(read_fn): return read_fn(tmp_name, **arguments) return None
[docs]class DataResource(object): __slots__ = ('handler', 'info') def __init__(self, storage_info: StorageInfo): self.info = storage_info def __enter__(self): self.handler = DataHandler(self.info) return self.handler def __exit__(self): self.handler.cleanup()