1# This file is part of cloud-init. See LICENSE file for license information. 2 3import json 4import os 5import stat 6 7from cloudinit import atomic_helper 8 9from cloudinit.tests.helpers import CiTestCase 10 11 12class TestAtomicHelper(CiTestCase): 13 def test_basic_usage(self): 14 """write_file takes bytes if no omode.""" 15 path = self.tmp_path("test_basic_usage") 16 contents = b"Hey there\n" 17 atomic_helper.write_file(path, contents) 18 self.check_file(path, contents) 19 20 def test_string(self): 21 """write_file can take a string with mode w.""" 22 path = self.tmp_path("test_string") 23 contents = "Hey there\n" 24 atomic_helper.write_file(path, contents, omode="w") 25 self.check_file(path, contents, omode="r") 26 27 def test_file_permissions(self): 28 """write_file with mode 400 works correctly.""" 29 path = self.tmp_path("test_file_permissions") 30 contents = b"test_file_perms" 31 atomic_helper.write_file(path, contents, mode=0o400) 32 self.check_file(path, contents, perms=0o400) 33 34 def test_write_json(self): 35 """write_json output is readable json.""" 36 path = self.tmp_path("test_write_json") 37 data = {'key1': 'value1', 'key2': ['i1', 'i2']} 38 atomic_helper.write_json(path, data) 39 with open(path, "r") as fp: 40 found = json.load(fp) 41 self.assertEqual(data, found) 42 self.check_perms(path, 0o644) 43 44 def check_file(self, path, content, omode=None, perms=0o644): 45 if omode is None: 46 omode = "rb" 47 self.assertTrue(os.path.exists(path)) 48 self.assertTrue(os.path.isfile(path)) 49 with open(path, omode) as fp: 50 found = fp.read() 51 self.assertEqual(content, found) 52 self.check_perms(path, perms) 53 54 def check_perms(self, path, perms): 55 file_stat = os.stat(path) 56 self.assertEqual(perms, stat.S_IMODE(file_stat.st_mode)) 57 58# vi: ts=4 expandtab 59