Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CentOS 6 ebs mount support #520

Open
wants to merge 18 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions starcluster/awsutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ def request_instances(self, image_id, price=None, instance_type='m1.small',
availability_zone_group=None, placement=None,
user_data=None, placement_group=None,
block_device_map=None, subnet_id=None,
network_interfaces=None):
network_interfaces=None, iam_profile=None):
"""
Convenience method for running spot or flat-rate instances
"""
Expand Down Expand Up @@ -511,7 +511,8 @@ def request_instances(self, image_id, price=None, instance_type='m1.small',
placement_group=placement_group,
user_data=user_data,
block_device_map=block_device_map,
network_interfaces=network_interfaces)
network_interfaces=network_interfaces,
iam_profile=iam_profile)
if price:
return self.request_spot_instances(
price, image_id,
Expand All @@ -532,7 +533,7 @@ def request_spot_instances(self, price, image_id, instance_type='m1.small',
security_group_ids=None, subnet_id=None,
placement=None, placement_group=None,
user_data=None, block_device_map=None,
network_interfaces=None):
network_interfaces=None, iam_profile=None):
kwargs = locals()
kwargs.pop('self')
return self.conn.request_spot_instances(**kwargs)
Expand Down Expand Up @@ -605,7 +606,7 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1,
max_count=1, key_name=None, security_groups=None,
placement=None, user_data=None, placement_group=None,
block_device_map=None, subnet_id=None,
network_interfaces=None):
network_interfaces=None, iam_profile=None):
kwargs = dict(
instance_type=instance_type,
min_count=min_count,
Expand All @@ -616,7 +617,8 @@ def run_instances(self, image_id, instance_type='m1.small', min_count=1,
user_data=user_data,
placement_group=placement_group,
block_device_map=block_device_map,
network_interfaces=network_interfaces
network_interfaces=network_interfaces,
instance_profile_name=iam_profile
)
if subnet_id:
kwargs.update(
Expand Down
60 changes: 38 additions & 22 deletions starcluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def __repr__(self):
return "<ClusterManager: %s>" % self.ec2.region.name

def get_cluster(self, cluster_name, group=None, load_receipt=True,
load_plugins=True, load_volumes=True, require_keys=True):
load_plugins=True, load_volumes=True, require_keys=True, load_iam_profile=True):
"""
Returns a Cluster object representing an active cluster
"""
Expand All @@ -64,7 +64,8 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True,
cluster_group=group)
if load_receipt:
cl.load_receipt(load_plugins=load_plugins,
load_volumes=load_volumes)
load_volumes=load_volumes,
load_iam_profile=load_iam_profile)
try:
cl.keyname = cl.keyname or cl.master_node.key_name
key_location = self.cfg.get_key(cl.keyname).get('key_location')
Expand All @@ -79,14 +80,15 @@ def get_cluster(self, cluster_name, group=None, load_receipt=True,
except exception.SecurityGroupDoesNotExist:
raise exception.ClusterDoesNotExist(cluster_name)

def get_clusters(self, load_receipt=True, load_plugins=True):
def get_clusters(self, load_receipt=True, load_plugins=True, load_iam_profile=True):
"""
Returns a list of all active clusters
"""
cluster_groups = self.get_cluster_security_groups()
clusters = [self.get_cluster(g.name, group=g,
load_receipt=load_receipt,
load_plugins=load_plugins)
load_plugins=load_plugins,
load_iam_profile=load_iam_profile)
for g in cluster_groups]
return clusters

Expand Down Expand Up @@ -169,24 +171,27 @@ def _get_cluster_name(self, cluster_name):

def add_node(self, cluster_name, alias=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
placement_group=None, spot_bid=None,
iam_profile=None):
cl = self.get_cluster(cluster_name)
return cl.add_node(alias=alias, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)
no_create=no_create, iam_profile=iam_profile)

def add_nodes(self, cluster_name, num_nodes, aliases=None, no_create=False,
image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None):
placement_group=None, spot_bid=None, iam_profile=None):
"""
Add one or more nodes to cluster
"""
print __file__,191
print iam_profile
cl = self.get_cluster(cluster_name)
return cl.add_nodes(num_nodes, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group, spot_bid=spot_bid,
no_create=no_create)
no_create=no_create, iam_profile=iam_profile)

def remove_node(self, cluster_name, alias=None, terminate=True,
force=False):
Expand Down Expand Up @@ -290,7 +295,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False):
tag = self.get_tag_from_sg(scg.name)
try:
cl = self.get_cluster(tag, group=scg, load_plugins=False,
load_volumes=False, require_keys=False)
load_volumes=False, require_keys=False,
load_iam_profile=True)
except exception.IncompatibleCluster as e:
sep = '*' * 60
log.error('\n'.join([sep, e.msg, sep]),
Expand Down Expand Up @@ -319,6 +325,8 @@ def list_clusters(self, cluster_groups=None, show_ssh_status=False):
print 'Subnet: %s' % getattr(n, 'subnet_id', 'N/A')
print 'Zone: %s' % getattr(n, 'placement', 'N/A')
print 'Keypair: %s' % getattr(n, 'key_name', 'N/A')
ipn = cl.iam_profile if cl.iam_profile else 'N/A'
print 'IAM instance profile: %s' % ipn
ebs_vols = []
for node in nodes:
devices = node.attached_vols
Expand Down Expand Up @@ -412,6 +420,7 @@ def __init__(self,
disable_cloudinit=False,
subnet_id=None,
public_ips=None,
iam_profile=None,
**kwargs):
# update class vars with given vars
_vars = locals().copy()
Expand Down Expand Up @@ -523,7 +532,8 @@ def load_volumes(self, vols):
This method assigns the first volume to /dev/sdz, second to /dev/sdy,
etc. for all volumes that do not include a device/partition setting
"""
devices = ['/dev/sd%s' % s for s in string.lowercase]
# devices = ['/dev/sd%s' % s for s in string.lowercase]
devices = ['/dev/xvdb%s' % s for s in string.lowercase]
devmap = {}
for volname in vols:
vol = vols.get(volname)
Expand Down Expand Up @@ -571,7 +581,7 @@ def __str__(self):
cfg = self.__getstate__()
return pprint.pformat(cfg)

def load_receipt(self, load_plugins=True, load_volumes=True):
def load_receipt(self, load_plugins=True, load_volumes=True, load_iam_profile=True):
"""
Load the original settings used to launch this cluster into this
Cluster object. Settings are loaded from cluster group tags and the
Expand All @@ -587,7 +597,7 @@ def load_receipt(self, load_plugins=True, load_volumes=True):
sep = '*' * 60
log.warn('\n'.join([sep, msg, sep]), extra={'__textwrap__': 1})
self.update(self._get_settings_from_tags())
if not (load_plugins or load_volumes):
if not (load_plugins or load_volumes or load_iam_profile):
return True
try:
master = self.master_node
Expand All @@ -603,6 +613,8 @@ def load_receipt(self, load_plugins=True, load_volumes=True):
self.plugins = self.load_plugins(master.get_plugins())
if load_volumes:
self.volumes = master.get_volumes()
if load_iam_profile:
self.iam_profile = master.get_iam_profile()
except exception.PluginError:
log.error("An error occurred while loading plugins: ",
exc_info=True)
Expand Down Expand Up @@ -702,7 +714,8 @@ def _add_tags_to_sg(self, sg):
disable_cloudinit=self.disable_cloudinit)
user_settings = dict(cluster_user=self.cluster_user,
cluster_shell=self.cluster_shell,
keyname=self.keyname, spot_bid=self.spot_bid)
keyname=self.keyname, spot_bid=self.spot_bid,
userdata_scripts=self.userdata_scripts)
core = utils.dump_compress_encode(core_settings, use_json=True,
chunk_size=static.MAX_TAG_LEN)
self._add_chunked_tags(sg, core, static.CORE_TAG)
Expand Down Expand Up @@ -887,11 +900,11 @@ def get_spot_requests_or_raise(self):
return spots

def create_node(self, alias, image_id=None, instance_type=None, zone=None,
placement_group=None, spot_bid=None, force_flat=False):
placement_group=None, spot_bid=None, force_flat=False, iam_profile=None):
return self.create_nodes([alias], image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid, force_flat=force_flat)[0]
spot_bid=spot_bid, force_flat=force_flat, iam_profile=iam_profile)[0]

def _get_cluster_userdata(self, aliases):
alias_file = utils.string_to_file('\n'.join(['#ignored'] + aliases),
Expand All @@ -913,7 +926,7 @@ def _get_cluster_userdata(self, aliases):

def create_nodes(self, aliases, image_id=None, instance_type=None,
zone=None, placement_group=None, spot_bid=None,
force_flat=False):
force_flat=False, iam_profile=None):
"""
Convenience method for requesting instances with this cluster's
settings. All settings (kwargs) except force_flat default to cluster
Expand All @@ -938,14 +951,16 @@ def create_nodes(self, aliases, image_id=None, instance_type=None,
image_id = image_id or self.node_image_id
count = len(aliases) if not spot_bid else 1
user_data = self._get_cluster_userdata(aliases)
iam_profile = iam_profile or self.iam_profile
kwargs = dict(price=spot_bid, instance_type=instance_type,
min_count=count, max_count=count, count=count,
key_name=self.keyname,
availability_zone_group=cluster_sg,
launch_group=cluster_sg,
placement=zone or getattr(self.zone, 'name', None),
user_data=user_data,
placement_group=placement_group)
placement_group=placement_group,
iam_profile=iam_profile)
if self.subnet_id:
netif = self.ec2.get_network_spec(
device_index=0, associate_public_ip_address=self.public_ips,
Expand Down Expand Up @@ -985,19 +1000,20 @@ def _get_next_node_num(self):

def add_node(self, alias=None, no_create=False, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None):
spot_bid=None, iam_profile=None):
"""
Add a single node to this cluster
"""
aliases = [alias] if alias else None
return self.add_nodes(1, aliases=aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid, no_create=no_create)
spot_bid=spot_bid, no_create=no_create,
iam_profile=iam_profile)

def add_nodes(self, num_nodes, aliases=None, image_id=None,
instance_type=None, zone=None, placement_group=None,
spot_bid=None, no_create=False):
spot_bid=None, no_create=False, iam_profile=None):
"""
Add new nodes to this cluster

Expand Down Expand Up @@ -1030,7 +1046,7 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None,
resp = self.create_nodes(aliases, image_id=image_id,
instance_type=instance_type, zone=zone,
placement_group=placement_group,
spot_bid=spot_bid)
spot_bid=spot_bid, iam_profile=iam_profile)
if spot_bid or self.spot_bid:
self.ec2.wait_for_propagation(spot_requests=resp)
else:
Expand Down Expand Up @@ -1665,7 +1681,7 @@ def _setup_cluster(self):
Runs the default StarCluster setup routines followed by any additional
plugin setup routines. Does not wait for nodes to come up.
"""
log.info("The master node is %s" % self.master_node.dns_name)
log.info("The master node is %s" % self.master_node.private_ip_address)
log.info("Configuring cluster...")
if self.volumes:
self.attach_volumes_to_master()
Expand Down
12 changes: 12 additions & 0 deletions starcluster/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ def get_volumes(self):
payload = volstxt.split('\n', 2)[2]
return utils.decode_uncompress_load(payload)

def get_iam_profile(self):
if self.instance.instance_profile:
arn = self.instance.instance_profile['arn']
match = re.match(r'arn:aws:iam::\d{12}:instance-profile/(\S+)', arn)
return match.group(1)
else:
return None

def _remove_all_tags(self):
tags = self.tags.keys()[:]
for t in tags:
Expand Down Expand Up @@ -237,6 +245,10 @@ def memory(self):
"free -m | grep -i mem | awk '{print $2}'")[0])
return self._memory

@property
def instance_profile(self):
return self.instance.instance_profile

@property
def ip_address(self):
return self.instance.ip_address
Expand Down
19 changes: 19 additions & 0 deletions starcluster/plugins/mount_ephemeral.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import os
from starcluster.clustersetup import ClusterSetup
from starcluster.logger import log

class MountEphemeralPlugin(ClusterSetup):
def __init__(self):
self.plugin_dir = os.path.dirname(os.path.realpath(__file__))

def mountEphemeralStorage(self, node):
log.info("Mounting ephemeral storage on %s" % node.alias)
node.ssh.put(self.plugin_dir + "/mount_ephemeral.sh", ".")
node.ssh.execute("sh ./mount_ephemeral.sh")

def run(self, nodes, master, user, user_shell, volumes):
for node in nodes:
self.mountEphemeralStorage(node)

def on_add_node(self, node, nodes, master, user, user_shell, volumes):
self.mountEphemeralStorage(node)
30 changes: 30 additions & 0 deletions starcluster/plugins/mount_ephemeral.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#!/bin/bash

VOLUMES=""
for device in `curl -s 169.254.169.254/latest/meta-data/block-device-mapping/`
do
if [[ $device == "ephemeral"* ]]
then
block=`curl -s 169.254.169.254/latest/meta-data/block-device-mapping/$device | awk -F/ '{print $NF}'`;
if [[ -e /dev/$block ]]
then
#if [ ! -e /mnt/$device ]
#then
# mkfs.ext3 /dev/$block
# mkdir /mnt/$device
# mount /dev/$block /mnt/$device
# chmod 1777 /mnt/$device
#fi
pvcreate /dev/$block
VOLUMES="${VOLUMES} /dev/$block"
fi
fi
done

vgcreate vg_ephemeral $VOLUMES
SIZE=`vgdisplay vg_ephemeral | grep "Total PE" | awk '{print $3}'`
lvcreate -l $SIZE vg_ephemeral -n ephemerallv
mkfs.ext3 /dev/mapper/vg_ephemeral-ephemerallv
mkdir /scratch
mount /dev/mapper/vg_ephemeral-ephemerallv /scratch
chmod 1777 /scratch
15 changes: 15 additions & 0 deletions starcluster/plugins/tagger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from starcluster.clustersetup import ClusterSetup
from starcluster.logger import log

class TaggerPlugin(ClusterSetup):
def __init__(self, tags):
self.tags = [t.strip() for t in tags.split(',')]
self.tags = dict([t.split('=') for t in self.tags])

def run(self, nodes, master, user, user_shell, volumes):
log.info("Tagging all nodes...")
for tag in self.tags:
val = self.tags.get(tag)
log.info("Applying tag - %s: %s" % (tag, val))
for node in nodes:
node.add_tag(tag, val)
1 change: 1 addition & 0 deletions starcluster/static.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,4 +302,5 @@ def create_sc_config_dirs():
'force_spot_master': (bool, False, False, None, None),
'disable_cloudinit': (bool, False, False, None, None),
'dns_prefix': (bool, False, False, None, None),
'iam_profile': (str, False, None, None, None),
}
3 changes: 2 additions & 1 deletion starcluster/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ def is_valid_device(dev):
Checks that dev matches the following regular expression:
/dev/sd[a-z]$
"""
regex = re.compile('/dev/sd[a-z]$')
# regex = re.compile('/dev/sd[a-z]$')
regex = re.compile('/dev/xvdb[a-z]$')
try:
return regex.match(dev) is not None
except TypeError:
Expand Down