Skip to content

Commit

Permalink
Separating concern of token from backup identifier to enable more tha…
Browse files Browse the repository at this point in the history
…n one token.
  • Loading branch information
zmarois committed Nov 27, 2017
1 parent b482ce2 commit 0712a9e
Show file tree
Hide file tree
Showing 30 changed files with 320 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,18 @@ private void setPriamProperties()
String token = null;
String seeds = null;
boolean isReplace = false;
boolean isExternallyDefinedToken = false;
String replacedIp = "";
String extraEnvParams = null;

while (true)
{
try
{
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
isExternallyDefinedToken = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_externally_defined_token"));
if (isExternallyDefinedToken) {
token = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_token");
}
seeds = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_seeds");
isReplace = Boolean.parseBoolean(DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/is_replace_token"));
replacedIp = DataFetcher.fetchData("http://127.0.0.1:8080/Priam/REST/v1/cassconfig/get_replaced_ip");
Expand All @@ -64,7 +68,7 @@ private void setPriamProperties()
e.printStackTrace();
}

if (token != null && seeds != null)
if ((token != null || !isExternallyDefinedToken) && seeds != null)
break;
try
{
Expand All @@ -75,8 +79,10 @@ private void setPriamProperties()
// do nothing.
}
}

System.setProperty("cassandra.initial_token", token);

if (isExternallyDefinedToken) {
System.setProperty("cassandra.initial_token", token);
}

setExtraEnvParams(extraEnvParams);

Expand Down
5 changes: 5 additions & 0 deletions priam/src/main/java/com/netflix/priam/IConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,11 @@ public interface IConfiguration {
*/
public String getHostIP();

/**
* @return Gets the number of tokens assigned to the node when using virtual nodes.
*/
public int getNumTokens();

/**
* @return Bytes per second to throttle for backups
*/
Expand Down
2 changes: 1 addition & 1 deletion priam/src/main/java/com/netflix/priam/PriamServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public PriamServer(IConfiguration config, PriamScheduler scheduler, InstanceIden
}

public void intialize() throws Exception {
if (id.getInstance().isOutOfService())
if (id.isOutOfService())
return;

// start to schedule jobs
Expand Down
10 changes: 5 additions & 5 deletions priam/src/main/java/com/netflix/priam/aws/S3BackupPath.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public String getRemotePath() {
buff.append(baseDir).append(S3BackupPath.PATH_SEP); // Base dir
buff.append(region).append(S3BackupPath.PATH_SEP);
buff.append(clusterName).append(S3BackupPath.PATH_SEP);// Cluster name
buff.append(token).append(S3BackupPath.PATH_SEP);
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
buff.append(formatDate(time)).append(S3BackupPath.PATH_SEP);
buff.append(type).append(S3BackupPath.PATH_SEP);
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand Down Expand Up @@ -83,7 +83,7 @@ public void parseRemote(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
time = parseDate(pieces.get(4));
type = BackupFileType.valueOf(pieces.get(5));
if (type != BackupFileType.META && type != BackupFileType.CL) {
Expand All @@ -109,14 +109,14 @@ public void parsePartialPrefix(String remoteFilePath) {
baseDir = pieces.get(0);
region = pieces.get(1);
clusterName = pieces.get(2);
token = pieces.get(3);
nodeIdentifier = pieces.get(3);
}

@Override
public String remotePrefix(Date start, Date end, String location) {
StringBuffer buff = new StringBuffer(clusterPrefix(location));
token = factory.getInstance().getToken();
buff.append(token).append(S3BackupPath.PATH_SEP);
nodeIdentifier = instanceIdentity.getBackupIdentifier();
buff.append(nodeIdentifier).append(S3BackupPath.PATH_SEP);
// match the common characters to prefix.
buff.append(match(start, end));
return buff.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,9 @@ protected List<ReplaceableAttribute> createAttributesToRegister(PriamInstance in
instance.setUpdatetime(new Date().getTime());
List<ReplaceableAttribute> attrs = new ArrayList<ReplaceableAttribute>();
attrs.add(new ReplaceableAttribute(Attributes.INSTANCE_ID, instance.getInstanceId(), false));
attrs.add(new ReplaceableAttribute(Attributes.TOKEN, instance.getToken(), true));
if (instance.getToken() != null) {
attrs.add(new ReplaceableAttribute(Attributes.TOKEN, instance.getToken(), true));
}
attrs.add(new ReplaceableAttribute(Attributes.APP_ID, instance.getApp(), true));
attrs.add(new ReplaceableAttribute(Attributes.ID, Integer.toString(instance.getId()), true));
attrs.add(new ReplaceableAttribute(Attributes.AVAILABILITY_ZONE, instance.getRac(), true));
Expand All @@ -152,7 +154,9 @@ protected List<ReplaceableAttribute> createAttributesToRegister(PriamInstance in
protected List<Attribute> createAttributesToDeRegister(PriamInstance instance) {
List<Attribute> attrs = new ArrayList<Attribute>();
attrs.add(new Attribute(Attributes.INSTANCE_ID, instance.getInstanceId()));
attrs.add(new Attribute(Attributes.TOKEN, instance.getToken()));
if (instance.getToken() != null) {
attrs.add(new Attribute(Attributes.TOKEN, instance.getToken()));
}
attrs.add(new Attribute(Attributes.APP_ID, instance.getApp()));
attrs.add(new Attribute(Attributes.ID, Integer.toString(instance.getId())));
attrs.add(new Attribute(Attributes.AVAILABILITY_ZONE, instance.getRac()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public static TaskTimer getTimer(InstanceIdentity id) {
logger.info("Seed node. Instance id: {}"
+ ", host ip: {}"
+ ", host name: {}",
id.getInstance().getInstanceId(), id.getInstance().getHostIP(), id.getInstance().getHostName());
id.getInstanceId(), id.getHostIP(), id.getHostName());
return_ = new SimpleTimer(JOBNAME, 120 * 1000 + ran.nextInt(120 * 1000));
} else
return_ = new SimpleTimer(JOBNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,21 @@ public enum BackupFileType {
protected String columnFamily;
protected String fileName;
protected String baseDir;
protected String token;
protected String nodeIdentifier;
protected String region;
protected Date time;
protected long size; //uncompressed file size
protected long compressedFileSize = 0;
protected boolean isCassandra1_0;

protected final InstanceIdentity factory;
protected final InstanceIdentity instanceIdentity;
protected final IConfiguration config;
protected File backupFile;
protected Date uploadedTs;
protected int awsSlowDownExceptionCounter = 0;

public AbstractBackupPath(IConfiguration config, InstanceIdentity factory) {
this.factory = factory;
public AbstractBackupPath(IConfiguration config, InstanceIdentity instanceIdentity) {
this.instanceIdentity = instanceIdentity;
this.config = config;
}

Expand All @@ -93,7 +93,7 @@ public void parseLocal(File file, BackupFileType type) throws ParseException {
this.clusterName = config.getAppName();
this.baseDir = config.getBackupLocation();
this.region = config.getDC();
this.token = factory.getInstance().getToken();
this.nodeIdentifier = instanceIdentity.getBackupIdentifier();
this.type = type;
if (type != BackupFileType.META && type != BackupFileType.CL) {
this.keyspace = elements[0];
Expand Down Expand Up @@ -214,8 +214,8 @@ public String getBaseDir() {
return baseDir;
}

public String getToken() {
return token;
public String getNodeIdentifier() {
return nodeIdentifier;
}

public String getRegion() {
Expand Down Expand Up @@ -262,7 +262,7 @@ public void setFileName(String fileName) {
}

public InstanceIdentity getInstanceIdentity() {
return this.factory;
return this.instanceIdentity;
}

public void setUploadedTs(Date uploadedTs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public void execute() throws Exception {

Date startTime = Calendar.getInstance(TimeZone.getTimeZone("GMT")).getTime();
snapshotName = pathFactory.get().formatDate(startTime);
String token = instanceIdentity.getInstance().getToken();
String backupIdentifier = instanceIdentity.getBackupIdentifier();

// Save start snapshot status
BackupMetadata backupMetadata = new BackupMetadata(token, startTime);
BackupMetadata backupMetadata = new BackupMetadata(backupIdentifier, startTime);
snapshotStatusMgr.start(backupMetadata);

try {
Expand Down Expand Up @@ -158,6 +158,7 @@ public Void retriableCall() throws Exception {
}.call();
}


@Override
public String getName() {
return JOBNAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,7 @@ public String getCassProcessName() {
return config.get(CONFIG_CASS_PROCESS_NAME, DEFAULT_CASS_PROCESS_NAME);
}

@Override
public int getNumTokens() {
return config.get(CONFIG_VNODE_NUM_TOKENS, DEFAULT_VNODE_NUM_TOKENS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.netflix.priam.utils.ITokenManager;
import com.netflix.priam.utils.RetryableCallable;
import com.netflix.priam.utils.Sleeper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -78,8 +79,12 @@ public boolean test(PriamInstance input) {
};

private PriamInstance myInstance;
private String backupIdentifier;
private String token;
private boolean outOfService = false;
private boolean isReplace = false;
private boolean isTokenPregenerated = false;
private boolean initialized = true;
private String replacedIp = "";
private IDeadTokenRetriever deadTokenRetriever;
private IPreGeneratedTokenRetriever preGeneratedTokenRetriever;
Expand All @@ -104,7 +109,7 @@ public InstanceIdentity(IPriamInstanceFactory factory, IMembership membership, I
init();
}

public PriamInstance getInstance() {
PriamInstance getInstance() {
return myInstance;
}

Expand All @@ -118,7 +123,7 @@ public PriamInstance retriableCall() throws Exception {
for (PriamInstance ins : deadInstances) {
logger.info("[Dead] Iterating though the hosts: {}", ins.getInstanceId());
if (ins.getInstanceId().equals(config.getInstanceName())) {
ins.setOutOfService(true);
outOfService = true;
logger.info("[Dead] found that this node is dead."
+ " application: {}"
+ ", id: {}"
Expand Down Expand Up @@ -242,6 +247,16 @@ public void forEachExecution() {
}

logger.info("My token: {}", myInstance.getToken());

if (myInstance.getToken() == null || myInstance.getToken().isEmpty()) {
backupIdentifier = "virual" + Integer.toString(myInstance.getId());
} else
{
backupIdentifier = myInstance.getToken();
}

token = myInstance.getToken();
initialized = true;
}

private void populateRacMap() {
Expand Down Expand Up @@ -304,4 +319,49 @@ public String getReplacedIp() {
private static boolean isInstanceDummy(PriamInstance instance) {
return instance.getInstanceId().equals(DUMMY_INSTANCE_ID);
}

public boolean isOutOfService()
{
return outOfService;
}

public String getBackupIdentifier()
{
return backupIdentifier;
}

public void setBackupIdentifier(String backupIdentifier)
{
this.backupIdentifier = backupIdentifier;
}

public String getToken()
{
return token;
}

public void setToken(String token)
{
this.token = token;
}

public String getInstanceId()
{
return myInstance.getInstanceId();
}

public String getHostIP()
{
return myInstance.getHostIP();
}

public String getHostName()
{
return myInstance.getHostName();
}

public boolean isExternallyDefinedToken()
{
return initialized && StringUtils.isNotBlank(token);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,13 +130,5 @@ public void setUpdatetime(long updatetime) {
this.updatetime = updatetime;
}

public boolean isOutOfService() {
return outOfService;
}

public void setOutOfService(boolean outOfService) {
this.outOfService = outOfService;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,23 @@ public String getReplaceIp() {

private String findReplaceIp(List<PriamInstance> allIds, String token, String location) {
String ip = null;
for (PriamInstance ins : allIds) {
logger.info("Calling getIp on hostname[{}] and token[{}]", ins.getHostName(), token);
if (ins.getToken().equals(token) || !ins.getDC().equals(location)) { //avoid using dead instance and other regions' instances
continue;
}

try {
ip = getIp(ins.getHostName(), token);
} catch (ParseException e) {
ip = null;
}

if (ip != null) {
logger.info("Found the IP: {}", ip);
return ip;
if (token != null) {
for (PriamInstance ins : allIds) {
logger.info("Calling getIp on hostname[{}] and token[{}]", ins.getHostName(), token);
if (token.equalsIgnoreCase(ins.getToken()) || !ins.getDC().equals(location)) { //avoid using dead instance and other regions' instances
continue;
}

try {
ip = getIp(ins.getHostName(), token);
} catch (ParseException e) {
ip = null;
}

if (ip != null) {
logger.info("Found the IP: {}", ip);
return ip;
}
}
}

Expand Down
Loading

0 comments on commit 0712a9e

Please sign in to comment.