forked from theupdateframework/python-tuf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrepository_tool.py
executable file
·3291 lines (2473 loc) · 110 KB
/
repository_tool.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# Copyright 2013 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
repository_tool.py
<Author>
Vladimir Diaz <[email protected]>
<Started>
October 19, 2013
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Provide a tool that can create a TUF repository. It can be used with the
Python interpreter in interactive mode, or imported directly into a Python
module. See 'tuf/README' for the complete guide to using
'tuf.repository_tool.py'.
"""
import os
import time
import datetime
import logging
import tempfile
import shutil
import json
from collections import deque
from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import formats as sslib_formats
from securesystemslib import util as sslib_util
from securesystemslib import storage as sslib_storage
from tuf import exceptions
from tuf import formats
from tuf import keydb
from tuf import log
from tuf import repository_lib as repo_lib
from tuf import roledb
# Copy API
# pylint: disable=unused-import
# Copy generic repository API functions to be used via `repository_tool`
from tuf.repository_lib import (
create_tuf_client_directory,
disable_console_log_messages)
# Copy key-related API functions to be used via `repository_tool`
from tuf.repository_lib import (
import_rsa_privatekey_from_file,
import_ed25519_privatekey_from_file)
from securesystemslib.interface import (
generate_and_write_rsa_keypair,
generate_and_write_rsa_keypair_with_prompt,
generate_and_write_unencrypted_rsa_keypair,
generate_and_write_ecdsa_keypair,
generate_and_write_ecdsa_keypair_with_prompt,
generate_and_write_unencrypted_ecdsa_keypair,
generate_and_write_ed25519_keypair,
generate_and_write_ed25519_keypair_with_prompt,
generate_and_write_unencrypted_ed25519_keypair,
import_rsa_publickey_from_file,
import_ecdsa_publickey_from_file,
import_ed25519_publickey_from_file,
import_ecdsa_privatekey_from_file)
from securesystemslib.keys import (
format_metadata_to_key,
generate_rsa_key,
generate_ecdsa_key,
generate_ed25519_key,
import_rsakey_from_pem,
import_ecdsakey_from_pem)
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger(__name__)
# Add a console handler so that users are aware of potentially unintended
# states, such as multiple roles that share keys.
log.add_console_handler()
log.set_console_log_level(logging.INFO)
# Recommended RSA key sizes:
# https://en.wikipedia.org/wiki/Key_size#Asymmetric_algorithm_key_lengths
# Based on the above, RSA keys of size 3072 are expected to provide security
# through 2031 and beyond.
DEFAULT_RSA_KEY_BITS=3072
# The default number of hashed bin delegations
DEFAULT_NUM_BINS=1024
# The targets and metadata directory names. Metadata files are written
# to the staged metadata directory instead of the "live" one.
METADATA_STAGED_DIRECTORY_NAME = 'metadata.staged'
METADATA_DIRECTORY_NAME = 'metadata'
TARGETS_DIRECTORY_NAME = 'targets'
# The extension of TUF metadata.
METADATA_EXTENSION = '.json'
# Expiration date delta, in seconds, of the top-level roles. A metadata
# expiration date is set by taking the current time and adding the expiration
# seconds listed below.
# Initial 'root.json' expiration time of 1 year.
ROOT_EXPIRATION = 31556900
# Initial 'targets.json' expiration time of 3 months.
TARGETS_EXPIRATION = 7889230
# Initial 'snapshot.json' expiration time of 1 week.
SNAPSHOT_EXPIRATION = 604800
# Initial 'timestamp.json' expiration time of 1 day.
TIMESTAMP_EXPIRATION = 86400
class Repository(object):
"""
<Purpose>
Represent a TUF repository that contains the metadata of the top-level
roles, including all those delegated from the 'targets.json' role. The
repository object returned provides access to the top-level roles, and any
delegated targets that are added as the repository is modified. For
example, a Repository object named 'repository' provides the following
access by default:
repository.root.version = 2
repository.timestamp.expiration = datetime.datetime(2015, 8, 8, 12, 0)
repository.snapshot.add_verification_key(...)
repository.targets.delegate('unclaimed', ...)
Delegating a role from 'targets' updates the attributes of the parent
delegation, which then provides:
repository.targets('unclaimed').add_verification_key(...)
<Arguments>
repository_directory:
The root folder of the repository that contains the metadata and targets
sub-directories.
metadata_directory:
The metadata sub-directory contains the files of the top-level
roles, including all roles delegated from 'targets.json'.
targets_directory:
The targets sub-directory contains all the target files that are
downloaded by clients and are referenced in TUF Metadata. The hashes and
file lengths are listed in Metadata files so that they are securely
downloaded. Metadata files are similarly referenced in the top-level
metadata.
storage_backend:
An object which implements
securesystemslib.storage.StorageBackendInterface.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
use_timestamp_length:
Whether to include the optional length attribute of the snapshot
metadata file in the timestamp metadata.
Default is True.
use_timestamp_hashes:
Whether to include the optional hashes attribute of the snapshot
metadata file in the timestamp metadata.
Default is True.
use_snapshot_length:
Whether to include the optional length attribute for targets
metadata files in the snapshot metadata.
Default is False to save bandwidth but without losing security
from rollback attacks.
Read more at section 5.6 from the Mercury paper:
https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy
use_snapshot_hashes:
Whether to include the optional hashes attribute for targets
metadata files in the snapshot metadata.
Default is False to save bandwidth but without losing security
from rollback attacks.
Read more at section 5.6 from the Mercury paper:
https://www.usenix.org/conference/atc17/technical-sessions/presentation/kuppusamy
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
<Side Effects>
Creates top-level role objects and assigns them as attributes.
<Returns>
A Repository object that contains default Metadata objects for the top-level
roles.
"""
def __init__(self, repository_directory, metadata_directory,
targets_directory, storage_backend, repository_name='default',
use_timestamp_length=True, use_timestamp_hashes=True,
use_snapshot_length=False, use_snapshot_hashes=False):
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.PATH_SCHEMA.check_match(repository_directory)
sslib_formats.PATH_SCHEMA.check_match(metadata_directory)
sslib_formats.PATH_SCHEMA.check_match(targets_directory)
sslib_formats.NAME_SCHEMA.check_match(repository_name)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_timestamp_length)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_timestamp_hashes)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_snapshot_length)
sslib_formats.BOOLEAN_SCHEMA.check_match(use_snapshot_hashes)
self._repository_directory = repository_directory
self._metadata_directory = metadata_directory
self._targets_directory = targets_directory
self._repository_name = repository_name
self._storage_backend = storage_backend
self._use_timestamp_length = use_timestamp_length
self._use_timestamp_hashes = use_timestamp_hashes
self._use_snapshot_length = use_snapshot_length
self._use_snapshot_hashes = use_snapshot_hashes
try:
roledb.create_roledb(repository_name)
keydb.create_keydb(repository_name)
except sslib_exceptions.InvalidNameError:
logger.debug(repr(repository_name) + ' already exists. Overwriting'
' its contents.')
# Set the top-level role objects.
self.root = Root(self._repository_name)
self.snapshot = Snapshot(self._repository_name)
self.timestamp = Timestamp(self._repository_name)
self.targets = Targets(self._targets_directory, 'targets',
repository_name=self._repository_name)
def writeall(self, consistent_snapshot=False, use_existing_fileinfo=False):
"""
<Purpose>
Write all the JSON Metadata objects to their corresponding files for
roles which have changed.
writeall() raises an exception if any of the role metadata to be written
to disk is invalid, such as an insufficient threshold of signatures,
missing private keys, etc.
<Arguments>
consistent_snapshot:
A boolean indicating whether role metadata files should have their
version numbers as filename prefix when written to disk, i.e
'VERSION.ROLENAME.json', and target files should be copied to a
filename that has their hex digest as filename prefix, i.e
'HASH.FILENAME'. Note that:
- root metadata is always written with a version prefix, independently
of 'consistent_snapshot'
- the latest version of each metadata file is always also written
without version prefix
- target files are only copied to a hash-prefixed filename if
'consistent_snapshot' is True and 'use_existing_fileinfo' is False.
If both are True hash-prefixed target file copies must be created
out-of-band.
use_existing_fileinfo:
Boolean indicating whether the fileinfo dicts in the roledb should be
written as-is (True) or whether hashes should be generated (False,
requires access to the targets files on-disk).
<Exceptions>
tuf.exceptions.UnsignedMetadataError, if any of the top-level
and delegated roles do not have the minimum threshold of signatures.
<Side Effects>
Creates metadata files in the repository's metadata directory.
<Returns>
None.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly
# formatted.
sslib_formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
# At this point, keydb and roledb must be fully populated,
# otherwise writeall() throws a 'tuf.exceptions.UnsignedMetadataError' for
# the top-level roles. exception if any of the top-level roles are missing
# signatures, keys, etc.
# Write the metadata files of all the Targets roles that are dirty (i.e.,
# have been modified via roledb.update_roleinfo()).
filenames = {'root': os.path.join(self._metadata_directory,
repo_lib.ROOT_FILENAME), 'targets': os.path.join(self._metadata_directory,
repo_lib.TARGETS_FILENAME), 'snapshot': os.path.join(self._metadata_directory,
repo_lib.SNAPSHOT_FILENAME), 'timestamp': os.path.join(self._metadata_directory,
repo_lib.TIMESTAMP_FILENAME)}
snapshot_signable = None
dirty_rolenames = roledb.get_dirty_roles(self._repository_name)
for dirty_rolename in dirty_rolenames:
# Ignore top-level roles, they will be generated later in this method.
if dirty_rolename in roledb.TOP_LEVEL_ROLES:
continue
dirty_filename = os.path.join(self._metadata_directory,
dirty_rolename + METADATA_EXTENSION)
repo_lib._generate_and_write_metadata(dirty_rolename, dirty_filename,
self._targets_directory, self._metadata_directory,
self._storage_backend, consistent_snapshot, filenames,
repository_name=self._repository_name,
use_existing_fileinfo=use_existing_fileinfo)
# Metadata should be written in (delegated targets -> root -> targets ->
# snapshot -> timestamp) order. Begin by generating the 'root.json'
# metadata file. _generate_and_write_metadata() raises a
# 'securesystemslib.exceptions.Error' exception if the metadata cannot be
# written.
root_roleinfo = roledb.get_roleinfo('root', self._repository_name)
old_consistent_snapshot = root_roleinfo['consistent_snapshot']
if 'root' in dirty_rolenames or consistent_snapshot != old_consistent_snapshot:
repo_lib._generate_and_write_metadata('root', filenames['root'],
self._targets_directory, self._metadata_directory,
self._storage_backend, consistent_snapshot, filenames,
repository_name=self._repository_name)
# Generate the 'targets.json' metadata file.
if 'targets' in dirty_rolenames:
repo_lib._generate_and_write_metadata('targets', filenames['targets'],
self._targets_directory, self._metadata_directory,
self._storage_backend, consistent_snapshot,
repository_name=self._repository_name,
use_existing_fileinfo=use_existing_fileinfo)
# Generate the 'snapshot.json' metadata file.
if 'snapshot' in dirty_rolenames:
snapshot_signable, junk = repo_lib._generate_and_write_metadata('snapshot',
filenames['snapshot'], self._targets_directory,
self._metadata_directory, self._storage_backend,
consistent_snapshot, filenames,
repository_name=self._repository_name,
use_snapshot_length=self._use_snapshot_length,
use_snapshot_hashes=self._use_snapshot_hashes)
# Generate the 'timestamp.json' metadata file.
if 'timestamp' in dirty_rolenames:
repo_lib._generate_and_write_metadata('timestamp', filenames['timestamp'],
self._targets_directory, self._metadata_directory,
self._storage_backend, consistent_snapshot,
filenames, repository_name=self._repository_name,
use_timestamp_length=self._use_timestamp_length,
use_timestamp_hashes=self._use_timestamp_hashes)
roledb.unmark_dirty(dirty_rolenames, self._repository_name)
# Delete the metadata of roles no longer in 'roledb'. Obsolete roles
# may have been revoked and should no longer have their metadata files
# available on disk, otherwise loading a repository may unintentionally
# load them.
if snapshot_signable is not None:
repo_lib._delete_obsolete_metadata(self._metadata_directory,
snapshot_signable['signed'], consistent_snapshot, self._repository_name,
self._storage_backend)
def write(self, rolename, consistent_snapshot=False, increment_version_number=True,
use_existing_fileinfo=False):
"""
<Purpose>
Write the JSON metadata for 'rolename' to its corresponding file on disk.
Unlike writeall(), write() allows the metadata file to contain an invalid
threshold of signatures.
<Arguments>
rolename:
The name of the role to be written to disk.
consistent_snapshot:
A boolean indicating whether the role metadata file should have its
version number as filename prefix when written to disk, i.e
'VERSION.ROLENAME.json'. Note that:
- root metadata is always written with a version prefix, independently
of 'consistent_snapshot'
- the latest version of the metadata file is always also written
without version prefix
- if the metadata is targets metadata and 'consistent_snapshot' is
True, the corresponding target files are copied to a filename with
their hex digest as filename prefix, i.e 'HASH.FILENAME', unless
'use_existing_fileinfo' is also True.
If 'consistent_snapshot' and 'use_existing_fileinfo' both are True,
hash-prefixed target file copies must be created out-of-band.
increment_version_number:
Boolean indicating whether the version number of 'rolename' should be
automatically incremented.
use_existing_fileinfo:
Boolean indicating whether the fileinfo dicts in the roledb should be
written as-is (True) or whether hashes should be generated (False,
requires access to the targets files on-disk).
<Exceptions>
None.
<Side Effects>
Creates metadata files in the repository's metadata directory.
<Returns>
None.
"""
rolename_filename = os.path.join(self._metadata_directory,
rolename + METADATA_EXTENSION)
filenames = {'root': os.path.join(self._metadata_directory, repo_lib.ROOT_FILENAME),
'targets': os.path.join(self._metadata_directory, repo_lib.TARGETS_FILENAME),
'snapshot': os.path.join(self._metadata_directory, repo_lib.SNAPSHOT_FILENAME),
'timestamp': os.path.join(self._metadata_directory, repo_lib.TIMESTAMP_FILENAME)}
repo_lib._generate_and_write_metadata(rolename, rolename_filename,
self._targets_directory, self._metadata_directory,
self._storage_backend, consistent_snapshot,
filenames=filenames, allow_partially_signed=True,
increment_version_number=increment_version_number,
repository_name=self._repository_name,
use_existing_fileinfo=use_existing_fileinfo)
# Ensure 'rolename' is no longer marked as dirty after the successful write().
roledb.unmark_dirty([rolename], self._repository_name)
def status(self):
"""
<Purpose>
Determine the status of the top-level roles. status() checks if each
role provides sufficient public and private keys, signatures, and that a
valid metadata file is generated if writeall() or write() were to be
called. Metadata files are temporarily written so that file hashes and
lengths may be verified, determine if delegated role trust is fully
obeyed, and target paths valid according to parent roles. status() does
not do a simple check for number of threshold keys and signatures.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
Generates and writes temporary metadata files.
<Returns>
None.
"""
temp_repository_directory = None
# Generate and write temporary metadata so that full verification of
# metadata is possible, such as verifying signatures, digests, and file
# content. Ensure temporary files are removed after verification results
# are completed.
try:
temp_repository_directory = tempfile.mkdtemp()
targets_directory = self._targets_directory
metadata_directory = os.path.join(temp_repository_directory,
METADATA_STAGED_DIRECTORY_NAME)
os.mkdir(metadata_directory)
# Verify the top-level roles and log the results.
repo_lib._log_status_of_top_level_roles(targets_directory,
metadata_directory, self._repository_name, self._storage_backend)
finally:
shutil.rmtree(temp_repository_directory, ignore_errors=True)
def dirty_roles(self):
"""
<Purpose>
Print/log the roles that have been modified. For example, if some role's
version number is changed (repository.timestamp.version = 2), it is
considered dirty and will be included in the list of dirty roles
printed/logged here. Unlike status(), signatures, public keys, targets,
etc. are not verified. status() should be called instead if the caller
would like to verify if a valid role file is generated if writeall() were
to be called.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
None.
"""
logger.info('Dirty roles: ' + str(roledb.get_dirty_roles(self._repository_name)))
def mark_dirty(self, roles):
"""
<Purpose>
Mark the list of 'roles' as dirty.
<Arguments>
roles:
A list of roles to mark as dirty. on the next write, these roles
will be written to disk.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
None.
"""
roledb.mark_dirty(roles, self._repository_name)
def unmark_dirty(self, roles):
"""
<Purpose>
No longer mark the list of 'roles' as dirty.
<Arguments>
roles:
A list of roles to mark as dirty. on the next write, these roles
will be written to disk.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
None.
"""
roledb.unmark_dirty(roles, self._repository_name)
@staticmethod
def get_filepaths_in_directory(files_directory, recursive_walk=False,
followlinks=True):
"""
<Purpose>
Walk the given 'files_directory' and build a list of target files found.
<Arguments>
files_directory:
The path to a directory of target files.
recursive_walk:
To recursively walk the directory, set recursive_walk=True.
followlinks:
To follow symbolic links, set followlinks=True.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.Error, if 'file_directory' is not a valid
directory.
Python IO exceptions.
<Side Effects>
None.
<Returns>
A list of absolute paths to target files in the given 'files_directory'.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.PATH_SCHEMA.check_match(files_directory)
sslib_formats.BOOLEAN_SCHEMA.check_match(recursive_walk)
sslib_formats.BOOLEAN_SCHEMA.check_match(followlinks)
# Ensure a valid directory is given.
if not os.path.isdir(files_directory):
raise sslib_exceptions.Error(repr(files_directory) + ' is not'
' a directory.')
# A list of the target filepaths found in 'files_directory'.
targets = []
# FIXME: We need a way to tell Python 2, but not Python 3, to return
# filenames in Unicode; see #61 and:
# http://docs.python.org/howto/unicode.html#unicode-filenames
for dirpath, dirnames, filenames in os.walk(files_directory,
followlinks=followlinks):
for filename in filenames:
full_target_path = os.path.join(os.path.abspath(dirpath), filename)
targets.append(full_target_path)
# Prune the subdirectories to walk right now if we do not wish to
# recursively walk 'files_directory'.
if recursive_walk is False:
del dirnames[:]
else:
logger.debug('Not pruning subdirectories ' + repr(dirnames))
return targets
class Metadata(object):
"""
<Purpose>
Provide a base class to represent a TUF Metadata role. There are four
top-level roles: Root, Targets, Snapshot, and Timestamp. The Metadata
class provides methods that are needed by all top-level roles, such as
adding and removing public keys, private keys, and signatures. Metadata
attributes, such as rolename, version, threshold, expiration, and key list
are also provided by the Metadata base class.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
None.
"""
def __init__(self):
self._rolename = None
self._repository_name = None
def add_verification_key(self, key, expires=None):
"""
<Purpose>
Add 'key' to the role. Adding a key, which should contain only the
public portion, signifies the corresponding private key and signatures
the role is expected to provide. A threshold of signatures is required
for a role to be considered properly signed. If a metadata file contains
an insufficient threshold of signatures, it must not be accepted.
>>>
>>>
>>>
<Arguments>
key:
The role key to be added, conformant to
'securesystemslib.formats.ANYKEY_SCHEMA'. Adding a public key to a role
means that its corresponding private key must generate and add its
signature to the role. A threshold number of signatures is required
for a role to be fully signed.
expires:
The date in which 'key' expires. 'expires' is a datetime.datetime()
object.
<Exceptions>
securesystemslib.exceptions.FormatError, if any of the arguments are
improperly formatted.
securesystemslib.exceptions.Error, if the 'expires' datetime has already
expired.
<Side Effects>
The role's entries in 'keydb' and 'roledb' are updated.
<Returns>
None.
"""
# Does 'key' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.ANYKEY_SCHEMA.check_match(key)
# If 'expires' is unset, choose a default expiration for 'key'. By
# default, Root, Targets, Snapshot, and Timestamp keys are set to expire
# 1 year, 3 months, 1 week, and 1 day from the current time, respectively.
if expires is None:
if self.rolename == 'root':
expires = \
formats.unix_timestamp_to_datetime(int(time.time() + ROOT_EXPIRATION))
elif self.rolename == 'Targets':
expires = \
formats.unix_timestamp_to_datetime(int(time.time() + TARGETS_EXPIRATION))
elif self.rolename == 'Snapshot':
expires = \
formats.unix_timestamp_to_datetime(int(time.time() + SNAPSHOT_EXPIRATION))
elif self.rolename == 'Timestamp':
expires = \
formats.unix_timestamp_to_datetime(int(time.time() + TIMESTAMP_EXPIRATION))
else:
expires = \
formats.unix_timestamp_to_datetime(int(time.time() + TIMESTAMP_EXPIRATION))
# Is 'expires' a datetime.datetime() object?
# Raise 'securesystemslib.exceptions.FormatError' if not.
if not isinstance(expires, datetime.datetime):
raise sslib_exceptions.FormatError(repr(expires) + ' is not a'
' datetime.datetime() object.')
# Truncate the microseconds value to produce a correct schema string
# of the form 'yyyy-mm-ddThh:mm:ssZ'.
expires = expires.replace(microsecond = 0)
# Ensure the expiration has not already passed.
current_datetime = \
formats.unix_timestamp_to_datetime(int(time.time()))
if expires < current_datetime:
raise sslib_exceptions.Error(repr(key) + ' has already'
' expired.')
# Update the key's 'expires' entry.
expires = expires.isoformat() + 'Z'
key['expires'] = expires
# Ensure 'key', which should contain the public portion, is added to
# 'keydb'. Add 'key' to the list of recognized keys.
# Keys may be shared, so do not raise an exception if 'key' has already
# been loaded.
try:
keydb.add_key(key, repository_name=self._repository_name)
except exceptions.KeyAlreadyExistsError:
logger.warning('Adding a verification key that has already been used.')
keyid = key['keyid']
roleinfo = roledb.get_roleinfo(self.rolename, self._repository_name)
# Save the keyids that are being replaced since certain roles will need to
# re-sign metadata with these keys (e.g., root). Use list() to make a copy
# of roleinfo['keyids'] to ensure we're modifying distinct lists.
previous_keyids = list(roleinfo['keyids'])
# Add 'key' to the role's entry in 'roledb', and avoid duplicates.
if keyid not in roleinfo['keyids']:
roleinfo['keyids'].append(keyid)
roleinfo['previous_keyids'] = previous_keyids
roledb.update_roleinfo(self._rolename, roleinfo,
repository_name=self._repository_name)
def remove_verification_key(self, key):
"""
<Purpose>
Remove 'key' from the role's currently recognized list of role keys.
The role expects a threshold number of signatures.
>>>
>>>
>>>
<Arguments>
key:
The role's key, conformant to 'securesystemslib.formats.ANYKEY_SCHEMA'.
'key' should contain only the public portion, as only the public key is
needed. The 'add_verification_key()' method should have previously
added 'key'.
<Exceptions>
securesystemslib.exceptions.FormatError, if the 'key' argument is
improperly formatted.
securesystemslib.exceptions.Error, if the 'key' argument has not been
previously added.
<Side Effects>
Updates the role's 'roledb' entry.
<Returns>
None.
"""
# Does 'key' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.ANYKEY_SCHEMA.check_match(key)
keyid = key['keyid']
roleinfo = roledb.get_roleinfo(self.rolename, self._repository_name)
if keyid in roleinfo['keyids']:
roleinfo['keyids'].remove(keyid)
roledb.update_roleinfo(self._rolename, roleinfo,
repository_name=self._repository_name)
else:
raise sslib_exceptions.Error('Verification key not found.')
def load_signing_key(self, key):
"""
<Purpose>
Load the role key, which must contain the private portion, so that role
signatures may be generated when the role's metadata file is eventually
written to disk.
>>>
>>>
>>>
<Arguments>
key:
The role's key, conformant to 'securesystemslib.formats.ANYKEY_SCHEMA'.
It must contain the private key, so that role signatures may be
generated when writeall() or write() is eventually called to generate
valid metadata files.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'key' is improperly formatted.
securesystemslib.exceptions.Error, if the private key is not found in 'key'.
<Side Effects>
Updates the role's 'keydb' and 'roledb' entries.
<Returns>
None.
"""
# Does 'key' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.ANYKEY_SCHEMA.check_match(key)
# Ensure the private portion of the key is available, otherwise signatures
# cannot be generated when the metadata file is written to disk.
if 'private' not in key['keyval'] or not len(key['keyval']['private']):
raise sslib_exceptions.Error('This is not a private key.')
# Has the key, with the private portion included, been added to the keydb?
# The public version of the key may have been previously added.
try:
keydb.add_key(key, repository_name=self._repository_name)
except exceptions.KeyAlreadyExistsError:
keydb.remove_key(key['keyid'], self._repository_name)
keydb.add_key(key, repository_name=self._repository_name)
# Update the role's 'signing_keys' field in 'roledb'.
roleinfo = roledb.get_roleinfo(self.rolename, self._repository_name)
if key['keyid'] not in roleinfo['signing_keyids']:
roleinfo['signing_keyids'].append(key['keyid'])
roledb.update_roleinfo(self.rolename, roleinfo,
repository_name=self._repository_name)
def unload_signing_key(self, key):
"""
<Purpose>
Remove a previously loaded role private key (i.e., load_signing_key()).
The keyid of the 'key' is removed from the list of recognized signing
keys.
>>>
>>>
>>>
<Arguments>
key:
The role key to be unloaded, conformant to
'securesystemslib.formats.ANYKEY_SCHEMA'.
<Exceptions>
securesystemslib.exceptions.FormatError, if the 'key' argument is
improperly formatted.
securesystemslib.exceptions.Error, if the 'key' argument has not been
previously loaded.
<Side Effects>
Updates the signing keys of the role in 'roledb'.
<Returns>
None.
"""
# Does 'key' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any are improperly formatted.
sslib_formats.ANYKEY_SCHEMA.check_match(key)
# Update the role's 'signing_keys' field in 'roledb'.
roleinfo = roledb.get_roleinfo(self.rolename, self._repository_name)
# TODO: Should we consider removing keys from keydb that are no longer
# associated with any roles? There could be many no-longer-used keys
# stored in the keydb if not. For now, just unload the key.
if key['keyid'] in roleinfo['signing_keyids']:
roleinfo['signing_keyids'].remove(key['keyid'])
roledb.update_roleinfo(self.rolename, roleinfo,
repository_name=self._repository_name)
else:
raise sslib_exceptions.Error('Signing key not found.')
def add_signature(self, signature, mark_role_as_dirty=True):
"""
<Purpose>
Add a signature to the role. A role is considered fully signed if it
contains a threshold of signatures. The 'signature' should have been
generated by the private key corresponding to one of the role's expected
keys.
>>>
>>>
>>>
<Arguments>
signature:
The signature to be added to the role, conformant to
'securesystemslib.formats.SIGNATURE_SCHEMA'.
mark_role_as_dirty:
A boolean indicating whether the updated 'roleinfo' for 'rolename'
should be marked as dirty. The caller might not want to mark
'rolename' as dirty if it is loading metadata from disk and only wants
to populate roledb.py. Likewise, add_role() would support a similar
boolean to allow the repository tools to successfully load roles via
load_repository() without needing to mark these roles as dirty (default
behavior).
<Exceptions>
securesystemslib.exceptions.FormatError, if the 'signature' argument is
improperly formatted.
<Side Effects>
Adds 'signature', if not already added, to the role's 'signatures' field
in 'roledb'.
<Returns>
None.
"""