Compare commits

...

8 Commits

Author SHA1 Message Date
Joona Hoikkala
8b59032614 Requested changes 2017-10-31 16:28:52 +02:00
Joona Hoikkala
c0a7600deb Lint fix 2017-10-17 11:20:24 +03:00
Joona Hoikkala
886b919659 Named tuple values for readability 2017-10-14 14:07:09 +03:00
Joona Hoikkala
296b4aaeea Merge remote-tracking branch 'origin/master' into nginx-ipv6 2017-10-14 08:35:58 +03:00
Joona Hoikkala
da3ae1611d Make ipv6_info() port aware 2017-10-11 17:29:02 +03:00
Joona Hoikkala
f4ee96ea6e IPv6 tests to Nginx plugin 2017-10-11 16:35:50 +03:00
Joona Hoikkala
f986c45d60 Test and lint fixes 2017-10-11 12:10:20 +03:00
Joona Hoikkala
994a6ce114 Nginx IPv6 support 2017-10-11 12:10:20 +03:00
9 changed files with 171 additions and 39 deletions

View File

@@ -257,6 +257,31 @@ class NginxConfigurator(common.Installer):
return vhost
def ipv6_info(self, port):
"""Returns tuple of booleans (ipv6_active, ipv6only_present)
ipv6_active is true if any server block listens ipv6 address in any port
ipv6only_present is true if ipv6only=on option exists in any server
block ipv6 listen directive for the specified port.
:param str port: Port to check ipv6only=on directive for
:returns: Tuple containing information if IPv6 is enabled in the global
configuration, and existence of ipv6only directive for specified port
:rtype: tuple of type (bool, bool)
"""
vhosts = self.parser.get_vhosts()
ipv6_active = False
ipv6only_present = False
for vh in vhosts:
for addr in vh.addrs:
if addr.ipv6:
ipv6_active = True
if addr.ipv6only and addr.get_port() == port:
ipv6only_present = True
return (ipv6_active, ipv6only_present)
def _vhost_from_duplicated_default(self, domain):
if self.new_vhost is None:
default_vhost = self._get_default_vhost()
@@ -446,9 +471,12 @@ class NginxConfigurator(common.Installer):
all_names.add(host)
elif not common.private_ips_regex.match(host):
# If it isn't a private IP, do a reverse DNS lookup
# TODO: IPv6 support
try:
socket.inet_aton(host)
if addr.ipv6:
host = addr.get_ipv6_exploded()
socket.inet_pton(socket.AF_INET6, host)
else:
socket.inet_pton(socket.AF_INET, host)
all_names.add(socket.gethostbyaddr(host)[0])
except (socket.error, socket.herror, socket.timeout):
continue
@@ -484,16 +512,38 @@ class NginxConfigurator(common.Installer):
:type vhost: :class:`~certbot_nginx.obj.VirtualHost`
"""
ipv6info = self.ipv6_info(self.config.tls_sni_01_port)
ipv6_block = ['']
ipv4_block = ['']
# If the vhost was implicitly listening on the default Nginx port,
# have it continue to do so.
if len(vhost.addrs) == 0:
listen_block = [['\n ', 'listen', ' ', self.DEFAULT_LISTEN_PORT]]
self.parser.add_server_directives(vhost, listen_block, replace=False)
if vhost.ipv6_enabled():
ipv6_block = ['\n ',
'listen',
' ',
'[::]:{0} ssl'.format(self.config.tls_sni_01_port)]
if not ipv6info[1]:
# ipv6only=on is absent in global config
ipv6_block.append(' ')
ipv6_block.append('ipv6only=on')
if vhost.ipv4_enabled():
ipv4_block = ['\n ',
'listen',
' ',
'{0} ssl'.format(self.config.tls_sni_01_port)]
snakeoil_cert, snakeoil_key = self._get_snakeoil_paths()
ssl_block = ([
['\n ', 'listen', ' ', '{0} ssl'.format(self.config.tls_sni_01_port)],
ipv6_block,
ipv4_block,
['\n ', 'ssl_certificate', ' ', snakeoil_cert],
['\n ', 'ssl_certificate_key', ' ', snakeoil_key],
['\n ', 'include', ' ', self.mod_ssl_conf],

View File

@@ -34,10 +34,13 @@ class Addr(common.Addr):
UNSPECIFIED_IPV4_ADDRESSES = ('', '*', '0.0.0.0')
CANONICAL_UNSPECIFIED_ADDRESS = UNSPECIFIED_IPV4_ADDRESSES[0]
def __init__(self, host, port, ssl, default):
def __init__(self, host, port, ssl, default, ipv6, ipv6only):
# pylint: disable=too-many-arguments
super(Addr, self).__init__((host, port))
self.ssl = ssl
self.default = default
self.ipv6 = ipv6
self.ipv6only = ipv6only
self.unspecified_address = host in self.UNSPECIFIED_IPV4_ADDRESSES
@classmethod
@@ -46,6 +49,8 @@ class Addr(common.Addr):
parts = str_addr.split(' ')
ssl = False
default = False
ipv6 = False
ipv6only = False
host = ''
port = ''
@@ -56,15 +61,25 @@ class Addr(common.Addr):
if addr.startswith('unix:'):
return None
tup = addr.partition(':')
if re.match(r'^\d+$', tup[0]):
# This is a bare port, not a hostname. E.g. listen 80
host = ''
port = tup[0]
# IPv6 check
ipv6_match = re.match(r'\[.*\]', addr)
if ipv6_match:
ipv6 = True
# IPv6 handling
host = ipv6_match.group()
# The rest of the addr string will be the port, if any
port = addr[ipv6_match.end()+1:]
else:
# This is a host-port tuple. E.g. listen 127.0.0.1:*
host = tup[0]
port = tup[2]
# IPv4 handling
tup = addr.partition(':')
if re.match(r'^\d+$', tup[0]):
# This is a bare port, not a hostname. E.g. listen 80
host = ''
port = tup[0]
else:
# This is a host-port tuple. E.g. listen 127.0.0.1:*
host = tup[0]
port = tup[2]
# The rest of the parts are options; we only care about ssl and default
while len(parts) > 0:
@@ -73,8 +88,10 @@ class Addr(common.Addr):
ssl = True
elif nextpart == 'default_server':
default = True
elif nextpart == "ipv6only=on":
ipv6only = True
return cls(host, port, ssl, default)
return cls(host, port, ssl, default, ipv6, ipv6only)
def to_string(self, include_default=True):
"""Return string representation of Addr"""
@@ -114,8 +131,6 @@ class Addr(common.Addr):
self.tup[1]), self.ipv6) == \
common.Addr((other.CANONICAL_UNSPECIFIED_ADDRESS,
other.tup[1]), other.ipv6)
# Nginx plugin currently doesn't support IPv6 but this will
# future-proof it
return super(Addr, self).__eq__(other)
def __eq__(self, other):
@@ -195,6 +210,20 @@ class VirtualHost(object): # pylint: disable=too-few-public-methods
return True
return False
def ipv6_enabled(self):
"""Return true if one or more of the listen directives in vhost supports
IPv6"""
for a in self.addrs:
if a.ipv6:
return True
def ipv4_enabled(self):
"""Return true if one or more of the listen directives in vhost are IPv4
only"""
for a in self.addrs:
if not a.ipv6:
return True
def _find_directive(directives, directive_name):
"""Find a directive of type directive_name in directives
"""

View File

@@ -46,7 +46,7 @@ class NginxConfiguratorTest(util.NginxTest):
def test_prepare(self):
self.assertEqual((1, 6, 2), self.config.version)
self.assertEqual(8, len(self.config.parser.parsed))
self.assertEqual(10, len(self.config.parser.parsed))
@mock.patch("certbot_nginx.configurator.util.exe_exists")
@mock.patch("certbot_nginx.configurator.subprocess.Popen")
@@ -90,7 +90,7 @@ class NginxConfiguratorTest(util.NginxTest):
self.assertEqual(names, set(
["155.225.50.69.nephoscale.net", "www.example.org", "another.alias",
"migration.com", "summer.com", "geese.com", "sslon.com",
"globalssl.com", "globalsslsetssl.com"]))
"globalssl.com", "globalsslsetssl.com", "ipv6.com", "ipv6ssl.com"]))
def test_supported_enhancements(self):
self.assertEqual(['redirect', 'staple-ocsp'],
@@ -132,6 +132,7 @@ class NginxConfiguratorTest(util.NginxTest):
server_conf = set(['somename', 'another.alias', 'alias'])
example_conf = set(['.example.com', 'example.*'])
foo_conf = set(['*.www.foo.com', '*.www.example.com'])
ipv6_conf = set(['ipv6.com'])
results = {'localhost': localhost_conf,
'alias': server_conf,
@@ -140,7 +141,8 @@ class NginxConfiguratorTest(util.NginxTest):
'www.example.com': example_conf,
'test.www.example.com': foo_conf,
'abc.www.foo.com': foo_conf,
'www.bar.co.uk': localhost_conf}
'www.bar.co.uk': localhost_conf,
'ipv6.com': ipv6_conf}
conf_path = {'localhost': "etc_nginx/nginx.conf",
'alias': "etc_nginx/nginx.conf",
@@ -149,7 +151,8 @@ class NginxConfiguratorTest(util.NginxTest):
'www.example.com': "etc_nginx/sites-enabled/example.com",
'test.www.example.com': "etc_nginx/foo.conf",
'abc.www.foo.com': "etc_nginx/foo.conf",
'www.bar.co.uk': "etc_nginx/nginx.conf"}
'www.bar.co.uk': "etc_nginx/nginx.conf",
'ipv6.com': "etc_nginx/sites-enabled/ipv6.com"}
bad_results = ['www.foo.com', 'example', 't.www.bar.co',
'69.255.225.155']
@@ -160,11 +163,24 @@ class NginxConfiguratorTest(util.NginxTest):
self.assertEqual(results[name], vhost.names)
self.assertEqual(conf_path[name], path)
# IPv6 specific checks
if name == "ipv6.com":
self.assertTrue(vhost.ipv6_enabled())
# Make sure that we have SSL enabled also for IPv6 addr
self.assertTrue(
any([True for x in vhost.addrs if x.ssl and x.ipv6]))
for name in bad_results:
self.assertRaises(errors.MisconfigurationError,
self.config.choose_vhost, name)
def test_ipv6only(self):
# ipv6_info: (ipv6_active, ipv6only_present)
self.assertEquals((True, False), self.config.ipv6_info("80"))
# Port 443 has ipv6only=on because of ipv6ssl.com vhost
self.assertEquals((True, True), self.config.ipv6_info("443"))
def test_more_info(self):
self.assertTrue('nginx.conf' in self.config.more_info())

View File

@@ -50,7 +50,9 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
'sites-enabled/example.com',
'sites-enabled/migration.com',
'sites-enabled/sslon.com',
'sites-enabled/globalssl.com']]),
'sites-enabled/globalssl.com',
'sites-enabled/ipv6.com',
'sites-enabled/ipv6ssl.com']]),
set(nparser.parsed.keys()))
self.assertEqual([['server_name', 'somename', 'alias', 'another.alias']],
nparser.parsed[nparser.abs_path('server.conf')])
@@ -74,7 +76,7 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
parsed = nparser._parse_files(nparser.abs_path(
'sites-enabled/example.com.test'))
self.assertEqual(3, len(glob.glob(nparser.abs_path('*.test'))))
self.assertEqual(5, len(
self.assertEqual(7, len(
glob.glob(nparser.abs_path('sites-enabled/*.test'))))
self.assertEqual([[['server'], [['listen', '69.50.225.155:9000'],
['listen', '127.0.0.1'],
@@ -110,7 +112,8 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
vhosts = nparser.get_vhosts()
vhost = obj.VirtualHost(nparser.abs_path('sites-enabled/globalssl.com'),
[obj.Addr('4.8.2.6', '57', True, False)],
[obj.Addr('4.8.2.6', '57', True, False,
False, False)],
True, True, set(['globalssl.com']), [], [0])
globalssl_com = [x for x in vhosts if 'globalssl.com' in x.filep][0]
@@ -121,35 +124,42 @@ class NginxParserTest(util.NginxTest): #pylint: disable=too-many-public-methods
vhosts = nparser.get_vhosts()
vhost1 = obj.VirtualHost(nparser.abs_path('nginx.conf'),
[obj.Addr('', '8080', False, False)],
[obj.Addr('', '8080', False, False,
False, False)],
False, True,
set(['localhost',
r'~^(www\.)?(example|bar)\.']),
[], [10, 1, 9])
vhost2 = obj.VirtualHost(nparser.abs_path('nginx.conf'),
[obj.Addr('somename', '8080', False, False),
obj.Addr('', '8000', False, False)],
[obj.Addr('somename', '8080', False, False,
False, False),
obj.Addr('', '8000', False, False,
False, False)],
False, True,
set(['somename', 'another.alias', 'alias']),
[], [10, 1, 12])
vhost3 = obj.VirtualHost(nparser.abs_path('sites-enabled/example.com'),
[obj.Addr('69.50.225.155', '9000',
False, False),
obj.Addr('127.0.0.1', '', False, False)],
False, False, False, False),
obj.Addr('127.0.0.1', '', False, False,
False, False)],
False, True,
set(['.example.com', 'example.*']), [], [0])
vhost4 = obj.VirtualHost(nparser.abs_path('sites-enabled/default'),
[obj.Addr('myhost', '', False, True),
obj.Addr('otherhost', '', False, True)],
[obj.Addr('myhost', '', False, True,
False, False),
obj.Addr('otherhost', '', False, True,
False, False)],
False, True, set(['www.example.org']),
[], [0])
vhost5 = obj.VirtualHost(nparser.abs_path('foo.conf'),
[obj.Addr('*', '80', True, True)],
[obj.Addr('*', '80', True, True,
False, False)],
True, True, set(['*.www.foo.com',
'*.www.example.com']),
[], [2, 1, 0])
self.assertEqual(10, len(vhosts))
self.assertEqual(12, len(vhosts))
example_com = [x for x in vhosts if 'example.com' in x.filep][0]
self.assertEqual(vhost3, example_com)
default = [x for x in vhosts if 'default' in x.filep][0]

View File

@@ -0,0 +1,5 @@
server {
listen 80;
listen [::]:80;
server_name ipv6.com;
}

View File

@@ -0,0 +1,5 @@
server {
listen 443 ssl;
listen [::]:443 ssl ipv6only=on;
server_name ipv6ssl.com;
}

View File

@@ -125,10 +125,10 @@ class TlsSniPerformTest(util.NginxTest):
self.sni.add_chall(self.achalls[0])
self.sni.add_chall(self.achalls[2])
v_addr1 = [obj.Addr("69.50.225.155", "9000", True, False),
obj.Addr("127.0.0.1", "", False, False)]
v_addr2 = [obj.Addr("myhost", "", False, True)]
v_addr2_print = [obj.Addr("myhost", "", False, False)]
v_addr1 = [obj.Addr("69.50.225.155", "9000", True, False, False, False),
obj.Addr("127.0.0.1", "", False, False, False, False)]
v_addr2 = [obj.Addr("myhost", "", False, True, False, False)]
v_addr2_print = [obj.Addr("myhost", "", False, False, False, False)]
ll_addr = [v_addr1, v_addr2]
self.sni._mod_config(ll_addr) # pylint: disable=protected-access

View File

@@ -51,14 +51,32 @@ class NginxTlsSni01(common.TLSSNI01):
default_addr = "{0} ssl".format(
self.configurator.config.tls_sni_01_port)
ipv6, ipv6only = self.configurator.ipv6_info(
self.configurator.config.tls_sni_01_port)
for achall in self.achalls:
vhost = self.configurator.choose_vhost(achall.domain, raise_if_no_match=False)
if vhost is not None and vhost.addrs:
addresses.append(list(vhost.addrs))
else:
addresses.append([obj.Addr.fromstring(default_addr)])
logger.info("Using default address %s for TLSSNI01 authentication.", default_addr)
if ipv6:
# If IPv6 is active in Nginx configuration
ipv6_addr = "[::]:{0} ssl".format(
self.configurator.config.tls_sni_01_port)
if not ipv6only:
# If ipv6only=on is not already present in the config
ipv6_addr = ipv6_addr + " ipv6only=on"
addresses.append([obj.Addr.fromstring(default_addr),
obj.Addr.fromstring(ipv6_addr)])
logger.info(("Using default addresses %s and %s for " +
"TLSSNI01 authentication."),
default_addr,
ipv6_addr)
else:
addresses.append([obj.Addr.fromstring(default_addr)])
logger.info("Using default address %s for TLSSNI01 authentication.",
default_addr)
# Create challenge certs
responses = [self._setup_challenge_cert(x) for x in self.achalls]
@@ -112,7 +130,6 @@ class NginxTlsSni01(common.TLSSNI01):
raise errors.MisconfigurationError(
'Certbot could not find an HTTP block to include '
'TLS-SNI-01 challenges in %s.' % root)
config = [self._make_server_block(pair[0], pair[1])
for pair in six.moves.zip(self.achalls, ll_addrs)]
config = nginxparser.UnspacedList(config)

View File

@@ -251,7 +251,7 @@ class Addr(object):
"""Normalized representation of addr/port tuple
"""
if self.ipv6:
return (self._normalize_ipv6(self.tup[0]), self.tup[1])
return (self.get_ipv6_exploded(), self.tup[1])
return self.tup
def __eq__(self, other):