Skip to content

Commit

Permalink
ceph-iscsi: add erasure pool support
Browse files Browse the repository at this point in the history
Erasure coded pools do not support omap, will just store the data
in ec pool, and their metadata will be in 'rbd' replicated pool.

Signed-off-by: Xiubo Li <[email protected]>
  • Loading branch information
lxbsz committed Jun 21, 2021
1 parent 45f7dd4 commit e61a60d
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 24 deletions.
92 changes: 79 additions & 13 deletions ceph_iscsi_config/lun.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import json
import rados
import rbd
import re
Expand All @@ -13,8 +14,9 @@
from ceph_iscsi_config.backstore import USER_RBD
from ceph_iscsi_config.utils import (convert_2_bytes, gen_control_string,
valid_size, get_pool_id, ip_addresses,
get_pools, get_rbd_size, this_host,
human_size, CephiSCSIError)
get_pools, get_rbd_size, run_shell_cmd,
human_size, CephiSCSIError, this_host,
is_erasure_pool)
from ceph_iscsi_config.gateway_object import GWObject
from ceph_iscsi_config.target import GWTarget
from ceph_iscsi_config.client import GWClient, CHAP
Expand Down Expand Up @@ -46,7 +48,7 @@ class RBDDev(object):
]
}

def __init__(self, image, size, backstore, pool=None):
def __init__(self, image, size, backstore, pool=None, is_erasure=False):
self.image = image
self.size_bytes = convert_2_bytes(size)
self.backstore = backstore
Expand All @@ -58,6 +60,8 @@ def __init__(self, image, size, backstore, pool=None):
self.error_msg = ''
self.changed = False

self.is_erasure_pool = is_erasure_pool(settings, pool)

def create(self):
"""
Create an rbd image compatible with exporting through LIO to multiple
Expand All @@ -67,14 +71,20 @@ def create(self):

with rados.Rados(conffile=settings.config.cephconf,
name=settings.config.cluster_client_name) as cluster:
with cluster.open_ioctx(self.pool) as ioctx:
_pool = self.pool
data_pool = None
if self.is_erasure_pool:
_pool = settings.config.pool
data_pool = self.pool
with cluster.open_ioctx(_pool) as ioctx:
rbd_inst = rbd.RBD()
try:
rbd_inst.create(ioctx,
self.image,
self.size_bytes,
features=RBDDev.default_features(self.backstore),
old_format=False)
old_format=False,
data_pool=data_pool)

except (rbd.ImageExists, rbd.InvalidArgument) as err:
self.error = True
Expand All @@ -95,7 +105,10 @@ def delete(self):

with rados.Rados(conffile=settings.config.cephconf,
name=settings.config.cluster_client_name) as cluster:
with cluster.open_ioctx(self.pool) as ioctx:
_pool = self.pool
if self.is_erasure_pool:
_pool = settings.config.pool
with cluster.open_ioctx(_pool) as ioctx:
rbd_inst = rbd.RBD()

ctr = 0
Expand Down Expand Up @@ -139,7 +152,10 @@ def rbd_size(self):

with rados.Rados(conffile=settings.config.cephconf,
name=settings.config.cluster_client_name) as cluster:
with cluster.open_ioctx(self.pool) as ioctx:
_pool = self.pool
if self.is_erasure_pool:
_pool = settings.config.pool
with cluster.open_ioctx(_pool) as ioctx:
with rbd.Image(ioctx, self.image) as rbd_image:

# get the current size in bytes
Expand All @@ -166,7 +182,10 @@ def _get_size_bytes(self):

with rados.Rados(conffile=settings.config.cephconf,
name=settings.config.cluster_client_name) as cluster:
with cluster.open_ioctx(self.pool) as ioctx:
_pool = self.pool
if self.is_erasure_pool:
_pool = settings.config.pool
with cluster.open_ioctx(_pool) as ioctx:
with rbd.Image(ioctx, self.image) as rbd_image:
image_size = rbd_image.size()

Expand All @@ -186,7 +205,11 @@ def rbd_list(conf=None, pool=None):
pool = settings.config.pool

with rados.Rados(conffile=conf, name=settings.config.cluster_client_name) as cluster:
with cluster.open_ioctx(pool) as ioctx:
is_erasure = is_erasure_pool(settings, pool)
_pool = pool
if is_erasure:
_pool = settings.config.pool
with cluster.open_ioctx(_pool) as ioctx:
rbd_inst = rbd.RBD()
rbd_names = rbd_inst.list(ioctx)
return rbd_names
Expand Down Expand Up @@ -222,7 +245,10 @@ def _valid_rbd(self):
valid_state = True
with rados.Rados(conffile=settings.config.cephconf,
name=settings.config.cluster_client_name) as cluster:
ioctx = cluster.open_ioctx(self.pool)
_pool = self.pool
if self.is_erasure_pool:
_pool = settings.config.pool
ioctx = cluster.open_ioctx(_pool)
with rbd.Image(ioctx, self.image) as rbd_image:

if rbd_image.features() & RBDDev.required_features(self.backstore) != \
Expand Down Expand Up @@ -290,7 +316,7 @@ class LUN(GWObject):
}

def __init__(self, logger, pool, image, size, allocating_host,
backstore, backstore_object_name):
backstore, backstore_object_name, is_erasure=False):
self.logger = logger
self.image = image
self.pool = pool
Expand All @@ -306,6 +332,8 @@ def __init__(self, logger, pool, image, size, allocating_host,
self.error_msg = ''
self.num_changes = 0

self.is_erasure_pool = is_erasure_pool(settings, pool)

try:
super(LUN, self).__init__('disks', self.config_key, logger,
LUN.SETTINGS[self.backstore])
Expand Down Expand Up @@ -574,6 +602,33 @@ def activate(self):
if client_err:
raise CephiSCSIError(client_err)

def _erasure_pool_check(self):
if not self.is_erasure_pool:
return None

data, err = run_shell_cmd(
"ceph -n {client_name} --conf {cephconf} osd metadata --format=json".
format(client_name=settings.config.cluster_client_name,
cephconf=settings.config.cephconf))
if err:
self.logger.error("Cannot get the objectstore type")
return err
store_type = json.loads(data)[0]['osd_objectstore']
self.logger.debug(f"pool ({self.pool}) objectstore type is ({store_type})")
if store_type not in ['bluestore', 'filestore']:
self.logger.error("Only bluestore/filestore objectstores allowed for erasure pool")
return err

data, err = run_shell_cmd(
"ceph -n {client_name} --conf {cephconf} osd pool get {pool} allow_ec_overwrites".
format(client_name=settings.config.cluster_client_name,
cephconf=settings.config.cephconf, pool=self.pool))
if err:
self.logger.error(f"Cannot get allow_ec_overwrites from pool ({self.pool})")
return err
self.logger.debug(f"erasure pool ({self.pool}) allow_ec_overwrites is enabled")
return None

def allocate(self, keep_dev_in_lio=True, in_wwn=None):
"""
Create image and add to LIO and config.
Expand All @@ -583,6 +638,10 @@ def allocate(self, keep_dev_in_lio=True, in_wwn=None):
:return: LIO storage object if successful and keep_dev_in_lio=True
else None.
"""
err = self._erasure_pool_check()
if err:
return None

self.logger.debug("LUN.allocate starting, listing rbd devices")
disk_list = RBDDev.rbd_list(pool=self.pool)
self.logger.debug("rados pool '{}' contains the following - "
Expand Down Expand Up @@ -888,7 +947,10 @@ def _add_dev_to_lio_user_rbd(self, in_wwn=None):
try:
# config string = rbd identifier / config_key (pool/image) /
# optional osd timeout
cfgstring = "rbd/{}/{};osd_op_timeout={}".format(self.pool,
_pool = self.pool
if self.is_erasure_pool:
_pool = settings.config.pool
cfgstring = "rbd/{}/{};osd_op_timeout={}".format(_pool,
self.image,
self.osd_op_timeout)
if (settings.config.cephconf != '/etc/ceph/ceph.conf'):
Expand Down Expand Up @@ -1224,7 +1286,11 @@ def define_luns(logger, config, target):

logger.debug("Processing rbd's in '{}' pool".format(pool))

with cluster.open_ioctx(pool) as ioctx:
is_erasure = is_erasure_pool(settings, pool)
_pool = pool
if is_erasure:
_pool = settings.config.pool
with cluster.open_ioctx(_pool) as ioctx:

pool_disks = [disk_key for disk_key in srtd_disks
if disk_key.startswith(pool + '/')]
Expand Down
18 changes: 18 additions & 0 deletions ceph_iscsi_config/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import datetime
import os
import json

import ceph_iscsi_config.settings as settings

Expand Down Expand Up @@ -38,6 +39,23 @@ def run_shell_cmd(cmd, stderr=None, shell=True):
return result, None


def is_erasure_pool(settings, pool):
data, err = run_shell_cmd(
"ceph -n {client_name} --conf {cephconf} osd dump --format=json".
format(client_name=settings.config.cluster_client_name,
cephconf=settings.config.cephconf))
if err:
raise CephiSCSIError("Erasure pool check failed {}".format(err))

for _pool in json.loads(data)['pools']:
if _pool['pool_name'] == pool:
# 3 is erasure pool
if _pool['type'] != 3:
return False
return True
raise CephiSCSIError("Pool ({}) not found".format(pool))


def normalize_ip_address(ip_address):
"""
IPv6 addresses should not include the square brackets utilized by
Expand Down
55 changes: 46 additions & 9 deletions gwcli/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ def __init__(self, parent):
self.scan_queue = None
self.scan_mutex = None

def _get_pool_type(self, pool):
root = self.get_ui_root()
pools = root.ceph.cluster.pools
pool_object = pools.pool_lookup.get(pool, None)
if pool_object:
return pool_object.type
return None

def _get_erasure_image_id(self, image_id):
_pool, _rbd_image = image_id.split('/', 1)
_type = self._get_pool_type(_pool)
if _type and _type == 'erasure':
return '/'.join([settings.config.pool, _rbd_image])
else:
return image_id

def _get_disk_meta(self, cluster_ioctx, disk_meta):
"""
Use the provided cluster context to take an rbd image name from the
Expand Down Expand Up @@ -114,7 +130,9 @@ def refresh(self, disk_info):

# Load the queue
for disk_name in disk_info.keys():
self.scan_queue.put(disk_name)
self.logger.debug(f"----- {disk_name}")
_disk_name = self._get_erasure_image_id(disk_name)
self.scan_queue.put(_disk_name)

start_time = int(time.time())
scan_threads = []
Expand Down Expand Up @@ -277,12 +295,12 @@ def _valid_pool(self, pool=None):
pools = root.ceph.cluster.pools
pool_object = pools.pool_lookup.get(pool, None)
if pool_object:
if pool_object.type == 'replicated':
self.logger.debug("pool '{}' is ok to use".format(pool))
if pool_object.type in ['replicated', 'erasure']:
self.logger.debug(f"pool '{pool}' is ok to use")
return True

self.logger.error("Invalid pool ({}). Must already exist and "
"be replicated".format(pool))
self.logger.error(f"Invalid pool ({pool}), the type is ({pool_object.type})."
" Must already exist and be erasure or replicated")
return False

def create_disk(self, pool=None, image=None, size=None, count=1,
Expand Down Expand Up @@ -603,12 +621,32 @@ def __init__(self, parent, pool, pool_disks_config, disks_meta=None):
self.disks_meta = disks_meta
self.refresh()

def _get_pool_type(self, pool):
root = self.get_ui_root()
pools = root.ceph.cluster.pools
pool_object = pools.pool_lookup.get(pool, None)
if pool_object:
return pool_object.type
return None

def _get_erasure_image_id(self, image_id):
_pool, _rbd_image = image_id.split('/', 1)
_type = self._get_pool_type(_pool)
if _type and _type == 'erasure':
return '/'.join([settings.config.pool, _rbd_image])
else:
return image_id

def refresh(self):
for pool_disk_config in self.pool_disks_config:
disk_id = '{}/{}'.format(pool_disk_config['pool'], pool_disk_config['image'])
size = self.disks_meta[disk_id].get('size', 0) if self.disks_meta else None
features = self.disks_meta[disk_id].get('features', 0) if self.disks_meta else None
snapshots = self.disks_meta[disk_id].get('snapshots', []) if self.disks_meta else None
self.logger.debug(f"disk_id = {disk_id}")
self.logger.debug(f"disk_meta = {self.disks_meta}")
_disk_id = self._get_erasure_image_id(disk_id)
size = self.disks_meta[_disk_id].get('size', 0) if self.disks_meta else None
self.logger.debug(f"disk_id = {disk_id}, size = {size}")
features = self.disks_meta[_disk_id].get('features', 0) if self.disks_meta else None
snapshots = self.disks_meta[_disk_id].get('snapshots', []) if self.disks_meta else None
Disk(self,
image_id=disk_id,
image_config=pool_disk_config,
Expand Down Expand Up @@ -736,7 +774,6 @@ def summary(self):
'disk/{}'.format(self.http_mode, settings.config.api_port,
self.image_id))

self.logger.debug("disk GET status for {}".format(self.image_id))
api = APIRequest(disk_api)
api.get()

Expand Down
9 changes: 7 additions & 2 deletions rbd-target-api.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ceph_iscsi_config.common import Config
from ceph_iscsi_config.utils import (normalize_ip_literal, resolve_ip_addresses,
ip_addresses, read_os_release, encryption_available,
CephiSCSIError, this_host)
CephiSCSIError, this_host, is_erasure_pool)
from ceph_iscsi_config.device_status import DeviceStatusWatcher

from gwcli.utils import (APIRequest, valid_gateway, valid_client,
Expand Down Expand Up @@ -1007,6 +1007,11 @@ def disk(pool, image):
logger.debug("this host is {}".format(local_gw))

image_id = '{}/{}'.format(pool, image)
is_erasure = is_erasure_pool(settings, pool)
if is_erasure:
_image_id = '/'.join([settings.config.pool, image])
else:
_image_id = image_id

config.refresh()

Expand All @@ -1015,7 +1020,7 @@ def disk(pool, image):
if image_id in config.config['disks']:
disk_dict = config.config["disks"][image_id]
global dev_status_watcher
disk_status = dev_status_watcher.get_dev_status(image_id)
disk_status = dev_status_watcher.get_dev_status(_image_id)
if disk_status:
disk_dict['status'] = disk_status.get_status_dict()
else:
Expand Down

0 comments on commit e61a60d

Please sign in to comment.