Skip to content

Commit

Permalink
Merge pull request #666 from Breakthrough-Energy/jen/blob
Browse files Browse the repository at this point in the history
Enable loading any scenario from blob storage
  • Loading branch information
jenhagg authored Aug 17, 2022
2 parents db36fcf + c51d095 commit 02dc955
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 23 deletions.
12 changes: 7 additions & 5 deletions powersimdata/data_access/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,20 @@ class Context:
"""Factory for data access instances"""

@staticmethod
def get_data_access():
def get_data_access(make_fs=None):
"""Return a data access instance appropriate for the current
environment.
:param callable make_fs: a function that returns a filesystem instance, or
None to use a default
:return: (:class:`powersimdata.data_access.data_access.DataAccess`) -- a data access
instance
"""
root = server_setup.DATA_ROOT_DIR

if server_setup.DEPLOYMENT_MODE == DeploymentMode.Server:
return SSHDataAccess(root)
return LocalDataAccess(root)
if make_fs is None:
make_fs = lambda: None # noqa: E731
return SSHDataAccess(make_fs())
return LocalDataAccess()

@staticmethod
def get_launcher(scenario):
Expand Down
24 changes: 12 additions & 12 deletions powersimdata/data_access/data_access.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
class DataAccess:
"""Interface to a local or remote data store."""

def __init__(self, root):
def __init__(self):
"""Constructor"""
self.root = root
self.join = fs.path.join
self.local_fs = None

Expand Down Expand Up @@ -163,10 +162,10 @@ def push(self, file_name, checksum):
class LocalDataAccess(DataAccess):
"""Interface to shared data volume"""

def __init__(self, root=server_setup.LOCAL_DIR):
super().__init__(root)
self.local_fs = fs.open_fs(root)
self.fs = self._get_fs()
def __init__(self, _fs=None):
super().__init__()
self.local_fs = fs.open_fs(server_setup.LOCAL_DIR)
self.fs = _fs if _fs is not None else self._get_fs()

def _get_fs(self):
mfs = MultiFS()
Expand All @@ -193,18 +192,19 @@ def push(self, file_name, checksum):
class SSHDataAccess(DataAccess):
"""Interface to a remote data store, accessed via SSH."""

def __init__(self, root=server_setup.DATA_ROOT_DIR):
def __init__(self, _fs=None):
"""Constructor"""
super().__init__(root)
self._fs = None
super().__init__()
self.root = server_setup.DATA_ROOT_DIR
self._fs = _fs
self.local_fs = fs.open_fs(server_setup.LOCAL_DIR)

@property
def fs(self):
"""Get or create the filesystem object
"""Get or create a filesystem object, defaulting to a MultiFS that combines the
server and blob containers.
:raises IOError: if connection failed or still within retry window
:return: (*fs.multifs.MultiFS*) -- filesystem instance
:return: (*fs.base.FS*) -- filesystem instance
"""
if self._fs is None:
self._fs = get_multi_fs(self.root)
Expand Down
24 changes: 22 additions & 2 deletions powersimdata/data_access/fs_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ def get_blob_fs(container):
:param str container: the container name
:return: (*fs.base.FS*) -- filesystem instance
"""
account = "besciences"
return fs.open_fs(f"azblob://{account}@{container}")
account = "esmi"
sas_token = server_setup.BLOB_TOKEN_RO
return fs.open_fs(f"azblobv2://{account}:{sas_token}@{container}")


def get_ssh_fs(root=""):
Expand Down Expand Up @@ -49,3 +50,22 @@ def get_multi_fs(root):
remotes = ",".join([f[0] for f in mfs.iterate_fs()])
print(f"Initialized remote filesystem with {remotes}")
return mfs


def get_scenario_fs():
"""Create filesystem combining the server (if connected) with blob storage,
prioritizing the server if connected.
:return: (*fs.base.FS*) -- filesystem instance
"""
scenario_data = get_blob_fs("scenariodata")
mfs = MultiFS()
try:
ssh_fs = get_ssh_fs(server_setup.DATA_ROOT_DIR)
mfs.add_fs("ssh_fs", ssh_fs, write=True, priority=2)
except: # noqa
print("Could not connect to ssh server")
mfs.add_fs("scenario_fs", scenario_data, priority=1)
remotes = ",".join([f[0] for f in mfs.iterate_fs()])
print(f"Initialized remote filesystem with {remotes}")
return mfs
3 changes: 1 addition & 2 deletions powersimdata/input/input_base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from powersimdata.data_access.context import Context
from powersimdata.utility.helpers import MemoryCache, cache_key

_cache = MemoryCache()
Expand All @@ -11,7 +10,7 @@ class InputBase:

def __init__(self):
"""Constructor."""
self.data_access = Context.get_data_access()
self.data_access = None
self._file_extension = {}

def _check_field(self, field_name):
Expand Down
3 changes: 3 additions & 0 deletions powersimdata/input/input_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import pandas as pd

from powersimdata.data_access.context import Context
from powersimdata.data_access.fs_helper import get_scenario_fs
from powersimdata.input.input_base import InputBase
from powersimdata.utility import server_setup

Expand All @@ -13,6 +15,7 @@ class InputData(InputBase):
def __init__(self):
super().__init__()
self._file_extension = {"ct": "pkl", "grid": "mat"}
self.data_access = Context.get_data_access(get_scenario_fs)

def _get_file_path(self, scenario_info, field_name):
"""Get the path to either grid or ct for the scenario
Expand Down
3 changes: 3 additions & 0 deletions powersimdata/input/profile_input.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pandas as pd

from powersimdata.data_access.context import Context
from powersimdata.data_access.fs_helper import get_blob_fs
from powersimdata.input.input_base import InputBase

profile_kind = {
Expand Down Expand Up @@ -39,6 +41,7 @@ class ProfileInput(InputBase):
def __init__(self):
super().__init__()
self._file_extension = {k: "csv" for k in profile_kind}
self.data_access = Context.get_data_access(lambda: get_blob_fs("profiles"))

def _get_file_path(self, scenario_info, field_name):
"""Get the path to the specified profile
Expand Down
3 changes: 2 additions & 1 deletion powersimdata/output/output_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from scipy.sparse import coo_matrix

from powersimdata.data_access.context import Context
from powersimdata.data_access.fs_helper import get_scenario_fs
from powersimdata.input.input_data import distribute_demand_from_zones_to_buses
from powersimdata.input.transform_profile import TransformProfile
from powersimdata.utility import server_setup
Expand All @@ -13,7 +14,7 @@ class OutputData:

def __init__(self):
"""Constructor"""
self._data_access = Context.get_data_access()
self._data_access = Context.get_data_access(get_scenario_fs)

def get_data(self, scenario_id, field_name):
"""Returns data either from server or from local directory.
Expand Down
3 changes: 2 additions & 1 deletion powersimdata/scenario/scenario.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from powersimdata.data_access.context import Context
from powersimdata.data_access.execute_list import ExecuteListManager
from powersimdata.data_access.fs_helper import get_scenario_fs
from powersimdata.data_access.scenario_list import ScenarioListManager
from powersimdata.scenario.analyze import Analyze
from powersimdata.scenario.create import Create, _Builder
Expand Down Expand Up @@ -52,7 +53,7 @@ def __init__(self, descriptor=None):
if descriptor is not None and not isinstance(descriptor, str):
raise TypeError("Descriptor must be a string or int (for a Scenario ID)")

self.data_access = Context.get_data_access()
self.data_access = Context.get_data_access(get_scenario_fs)
self._scenario_list_manager = ScenarioListManager(self.data_access)
self._execute_list_manager = ExecuteListManager(self.data_access)

Expand Down
1 change: 1 addition & 0 deletions powersimdata/utility/server_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
MODEL_DIR = config.MODEL_DIR
ENGINE_DIR = config.ENGINE_DIR
DEPLOYMENT_MODE = get_deployment_mode()
BLOB_TOKEN_RO = "?sv=2021-06-08&ss=b&srt=co&sp=rl&se=2050-08-06T01:31:08Z&st=2022-08-05T17:31:08Z&spr=https&sig=ORHiRQQCocyaHXV2phhSN92GFhRnaHuGOecskxsmG3U%3D"

os.makedirs(LOCAL_DIR, exist_ok=True)

Expand Down

0 comments on commit 02dc955

Please sign in to comment.