Lines Matching refs:res

178 #define	ISCONNECTED(res, no)	\  argument
179 ((res)->hr_remotein != NULL && (res)->hr_remoteout != NULL)
280 cleanup(struct hast_resource *res) in cleanup() argument
288 if (res->hr_ggateunit >= 0) { in cleanup()
293 ggiod.gctl_unit = res->hr_ggateunit; in cleanup()
295 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DESTROY, &ggiod) == -1) { in cleanup()
298 res->hr_provname); in cleanup()
300 res->hr_ggateunit = -1; in cleanup()
333 hast_activemap_flush(struct hast_resource *res) __unlocks(res->hr_amp_lock) in hast_activemap_flush() argument
339 mtx_lock(&res->hr_amp_diskmap_lock); in hast_activemap_flush()
340 buf = activemap_bitmap(res->hr_amp, &size); in hast_activemap_flush()
341 mtx_unlock(&res->hr_amp_lock); in hast_activemap_flush()
343 PJDLOG_ASSERT((size % res->hr_local_sectorsize) == 0); in hast_activemap_flush()
345 if (pwrite(res->hr_localfd, buf, size, METADATA_SIZE) != in hast_activemap_flush()
348 res->hr_stat_activemap_write_error++; in hast_activemap_flush()
351 if (ret == 0 && res->hr_metaflush == 1 && in hast_activemap_flush()
352 g_flush(res->hr_localfd) == -1) { in hast_activemap_flush()
355 res->hr_localpath); in hast_activemap_flush()
356 res->hr_metaflush = 0; in hast_activemap_flush()
360 res->hr_stat_activemap_flush_error++; in hast_activemap_flush()
364 mtx_unlock(&res->hr_amp_diskmap_lock); in hast_activemap_flush()
369 real_remote(const struct hast_resource *res) in real_remote() argument
372 return (strcmp(res->hr_remoteaddr, "none") != 0); in real_remote()
376 init_environment(struct hast_resource *res __unused) in init_environment()
504 init_resuid(struct hast_resource *res) in init_resuid() argument
508 if (res->hr_resuid != 0) { in init_resuid()
513 arc4random_buf(&res->hr_resuid, sizeof(res->hr_resuid)); in init_resuid()
515 if (metadata_write(res) == -1) in init_resuid()
522 init_local(struct hast_resource *res) in init_local() argument
527 if (metadata_read(res, true) == -1) in init_local()
529 mtx_init(&res->hr_amp_lock); in init_local()
530 if (activemap_init(&res->hr_amp, res->hr_datasize, res->hr_extentsize, in init_local()
531 res->hr_local_sectorsize, res->hr_keepdirty) == -1) { in init_local()
541 mapsize = activemap_ondisk_size(res->hr_amp); in init_local()
547 if (pread(res->hr_localfd, buf, mapsize, METADATA_SIZE) != in init_local()
551 activemap_copyin(res->hr_amp, buf, mapsize); in init_local()
553 if (res->hr_resuid != 0) in init_local()
562 res->hr_primary_localcnt = 0; in init_local()
563 res->hr_primary_remotecnt = 0; in init_local()
564 if (metadata_write(res) == -1) in init_local()
569 primary_connect(struct hast_resource *res, struct proto_conn **connp) in primary_connect() argument
575 if (proto_send(res->hr_conn, &val, sizeof(val)) == -1) { in primary_connect()
579 if (proto_recv(res->hr_conn, &val, sizeof(val)) == -1) { in primary_connect()
586 res->hr_remoteaddr); in primary_connect()
589 if (proto_connection_recv(res->hr_conn, true, &conn) == -1) { in primary_connect()
593 if (proto_connect_wait(conn, res->hr_timeout) == -1) { in primary_connect()
595 res->hr_remoteaddr); in primary_connect()
600 if (proto_timeout(conn, res->hr_timeout) == -1) in primary_connect()
612 enable_direct_reads(struct hast_resource *res) in enable_direct_reads() argument
618 ggiomodify.gctl_unit = res->hr_ggateunit; in enable_direct_reads()
620 strlcpy(ggiomodify.gctl_readprov, res->hr_localpath, in enable_direct_reads()
622 ggiomodify.gctl_readoffset = res->hr_localoff; in enable_direct_reads()
623 if (ioctl(res->hr_ggatefd, G_GATE_CMD_MODIFY, &ggiomodify) == 0) in enable_direct_reads()
630 init_remote(struct hast_resource *res, struct proto_conn **inp, in init_remote() argument
646 PJDLOG_ASSERT(real_remote(res)); in init_remote()
651 if (primary_connect(res, &out) == -1) in init_remote()
661 nv_add_string(nvout, res->hr_name, "resource"); in init_remote()
666 res->hr_remoteaddr); in init_remote()
670 if (hast_proto_send(res, out, nvout, NULL, 0) == -1) { in init_remote()
673 res->hr_remoteaddr); in init_remote()
681 res->hr_remoteaddr); in init_remote()
704 res->hr_version = version; in init_remote()
705 pjdlog_debug(1, "Negotiated protocol version %d.", res->hr_version); in init_remote()
709 res->hr_remoteaddr); in init_remote()
713 if (size != sizeof(res->hr_token)) { in init_remote()
715 res->hr_remoteaddr, size, sizeof(res->hr_token)); in init_remote()
719 bcopy(token, res->hr_token, sizeof(res->hr_token)); in init_remote()
726 if (primary_connect(res, &in) == -1) in init_remote()
730 nv_add_string(nvout, res->hr_name, "resource"); in init_remote()
731 nv_add_uint8_array(nvout, res->hr_token, sizeof(res->hr_token), in init_remote()
733 if (res->hr_resuid == 0) { in init_remote()
743 if (init_resuid(res)) in init_remote()
746 nv_add_uint64(nvout, res->hr_resuid, "resuid"); in init_remote()
747 nv_add_uint64(nvout, res->hr_primary_localcnt, "localcnt"); in init_remote()
748 nv_add_uint64(nvout, res->hr_primary_remotecnt, "remotecnt"); in init_remote()
752 res->hr_remoteaddr); in init_remote()
756 if (hast_proto_send(res, in, nvout, NULL, 0) == -1) { in init_remote()
759 res->hr_remoteaddr); in init_remote()
767 res->hr_remoteaddr); in init_remote()
777 if (datasize != res->hr_datasize) { in init_remote()
779 (intmax_t)res->hr_datasize, (intmax_t)datasize); in init_remote()
784 if (extentsize != res->hr_extentsize) { in init_remote()
786 (ssize_t)res->hr_extentsize, (ssize_t)extentsize); in init_remote()
790 res->hr_secondary_localcnt = nv_get_uint64(nvin, "localcnt"); in init_remote()
791 res->hr_secondary_remotecnt = nv_get_uint64(nvin, "remotecnt"); in init_remote()
792 res->hr_syncsrc = nv_get_uint8(nvin, "syncsrc"); in init_remote()
793 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) in init_remote()
794 enable_direct_reads(res); in init_remote()
800 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_PRIMARY); in init_remote()
801 PJDLOG_ASSERT(res->hr_secondary_localcnt == 0); in init_remote()
803 if (res->hr_primary_localcnt == 0) { in init_remote()
804 PJDLOG_ASSERT(res->hr_secondary_remotecnt == 0); in init_remote()
807 res->hr_primary_localcnt++; in init_remote()
809 (uintmax_t)res->hr_primary_localcnt); in init_remote()
810 (void)metadata_write(res); in init_remote()
828 if (hast_proto_recv_data(res, out, nvin, map, in init_remote()
836 mtx_lock(&res->hr_amp_lock); in init_remote()
840 activemap_merge(res->hr_amp, map, mapsize); in init_remote()
846 (void)hast_activemap_flush(res); in init_remote()
856 pjdlog_info("Connected to %s.", res->hr_remoteaddr); in init_remote()
857 if (res->hr_original_replication == HAST_REPLICATION_MEMSYNC && in init_remote()
858 res->hr_version < 2) { in init_remote()
860 res->hr_replication = HAST_REPLICATION_FULLSYNC; in init_remote()
861 } else if (res->hr_replication != res->hr_original_replication) { in init_remote()
865 res->hr_replication = res->hr_original_replication; in init_remote()
871 res->hr_remotein = in; in init_remote()
872 res->hr_remoteout = out; in init_remote()
874 event_send(res, EVENT_CONNECT); in init_remote()
878 event_send(res, EVENT_SPLITBRAIN); in init_remote()
906 init_ggate(struct hast_resource *res) in init_ggate() argument
914 res->hr_ggatefd = open("/dev/" G_GATE_CTL_NAME, O_RDWR); in init_ggate()
915 if (res->hr_ggatefd == -1) in init_ggate()
923 ggiocreate.gctl_mediasize = res->hr_datasize; in init_ggate()
924 ggiocreate.gctl_sectorsize = res->hr_local_sectorsize; in init_ggate()
930 res->hr_provname); in init_ggate()
931 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CREATE, &ggiocreate) == 0) { in init_ggate()
932 pjdlog_info("Device hast/%s created.", res->hr_provname); in init_ggate()
933 res->hr_ggateunit = ggiocreate.gctl_unit; in init_ggate()
938 res->hr_provname); in init_ggate()
942 res->hr_provname); in init_ggate()
952 res->hr_provname); in init_ggate()
953 if (ioctl(res->hr_ggatefd, G_GATE_CMD_CANCEL, &ggiocancel) == 0) { in init_ggate()
954 pjdlog_info("Device hast/%s recovered.", res->hr_provname); in init_ggate()
955 res->hr_ggateunit = ggiocancel.gctl_unit; in init_ggate()
959 res->hr_provname); in init_ggate()
963 hastd_primary(struct hast_resource *res) in hastd_primary() argument
973 if (proto_client(NULL, "socketpair://", &res->hr_ctrl) == -1) { in hastd_primary()
982 if (proto_client(NULL, "socketpair://", &res->hr_event) == -1) { in hastd_primary()
992 if (proto_client(NULL, "socketpair://", &res->hr_conn) == -1) { in hastd_primary()
1009 proto_recv(res->hr_event, NULL, 0); in hastd_primary()
1010 proto_recv(res->hr_conn, NULL, 0); in hastd_primary()
1012 proto_send(res->hr_ctrl, NULL, 0); in hastd_primary()
1013 res->hr_workerpid = pid; in hastd_primary()
1017 gres = res; in hastd_primary()
1018 res->output_status_aux = output_status_aux; in hastd_primary()
1023 proto_send(res->hr_event, NULL, 0); in hastd_primary()
1024 proto_send(res->hr_conn, NULL, 0); in hastd_primary()
1026 proto_recv(res->hr_ctrl, NULL, 0); in hastd_primary()
1027 descriptors_cleanup(res); in hastd_primary()
1029 descriptors_assert(res, mode); in hastd_primary()
1033 pjdlog_prefix_set("[%s] (%s) ", res->hr_name, role2str(res->hr_role)); in hastd_primary()
1034 setproctitle("%s (%s)", res->hr_name, role2str(res->hr_role)); in hastd_primary()
1036 init_local(res); in hastd_primary()
1037 init_ggate(res); in hastd_primary()
1038 init_environment(res); in hastd_primary()
1040 if (drop_privs(res) != 0) { in hastd_primary()
1041 cleanup(res); in hastd_primary()
1050 error = pthread_create(&td, NULL, guard_thread, res); in hastd_primary()
1060 error = pthread_create(&td, NULL, ctrl_thread, res); in hastd_primary()
1062 if (real_remote(res)) { in hastd_primary()
1063 error = init_remote(res, NULL, NULL); in hastd_primary()
1071 res->hr_timeout); in hastd_primary()
1074 error = init_remote(res, NULL, NULL); in hastd_primary()
1077 if (time(NULL) > start + res->hr_timeout) in hastd_primary()
1086 error = pthread_create(&td, NULL, ggate_recv_thread, res); in hastd_primary()
1088 error = pthread_create(&td, NULL, local_send_thread, res); in hastd_primary()
1090 error = pthread_create(&td, NULL, remote_send_thread, res); in hastd_primary()
1092 error = pthread_create(&td, NULL, remote_recv_thread, res); in hastd_primary()
1094 error = pthread_create(&td, NULL, ggate_send_thread, res); in hastd_primary()
1097 (void)sync_thread(res); in hastd_primary()
1135 remote_close(struct hast_resource *res, int ncomp) in remote_close() argument
1143 if (!ISCONNECTED(res, ncomp)) { in remote_close()
1144 PJDLOG_ASSERT(res->hr_remotein == NULL); in remote_close()
1145 PJDLOG_ASSERT(res->hr_remoteout == NULL); in remote_close()
1150 PJDLOG_ASSERT(res->hr_remotein != NULL); in remote_close()
1151 PJDLOG_ASSERT(res->hr_remoteout != NULL); in remote_close()
1154 res->hr_remoteaddr); in remote_close()
1155 proto_close(res->hr_remotein); in remote_close()
1156 res->hr_remotein = NULL; in remote_close()
1158 res->hr_remoteaddr); in remote_close()
1159 proto_close(res->hr_remoteout); in remote_close()
1160 res->hr_remoteout = NULL; in remote_close()
1164 pjdlog_warning("Disconnected from %s.", res->hr_remoteaddr); in remote_close()
1171 event_send(res, EVENT_DISCONNECT); in remote_close()
1178 write_complete(struct hast_resource *res, struct hio *hio) in write_complete() argument
1194 if (!ISCONNECTED(res, ncomp)) { in write_complete()
1196 if (res->hr_primary_localcnt == res->hr_secondary_remotecnt) { in write_complete()
1197 res->hr_primary_localcnt++; in write_complete()
1199 (uintmax_t)res->hr_primary_localcnt); in write_complete()
1200 (void)metadata_write(res); in write_complete()
1205 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) in write_complete()
1224 struct hast_resource *res = arg; in ggate_recv_thread() local
1235 ggio->gctl_unit = res->hr_ggateunit; in ggate_recv_thread()
1239 hio->hio_replication = res->hr_replication; in ggate_recv_thread()
1243 if (ioctl(res->hr_ggatefd, G_GATE_CMD_START, ggio) == -1) { in ggate_recv_thread()
1289 res->hr_stat_read++; in ggate_recv_thread()
1292 if (res->hr_syncsrc == HAST_SYNCSRC_UNDEF || in ggate_recv_thread()
1293 res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in ggate_recv_thread()
1302 PJDLOG_ASSERT(res->hr_syncsrc == in ggate_recv_thread()
1314 res->hr_stat_write++; in ggate_recv_thread()
1315 if (res->hr_resuid == 0 && in ggate_recv_thread()
1316 res->hr_primary_localcnt == 0) { in ggate_recv_thread()
1318 res->hr_primary_localcnt = 1; in ggate_recv_thread()
1347 mtx_lock(&res->hr_amp_lock); in ggate_recv_thread()
1348 if (activemap_write_start(res->hr_amp, in ggate_recv_thread()
1350 res->hr_stat_activemap_update++; in ggate_recv_thread()
1351 (void)hast_activemap_flush(res); in ggate_recv_thread()
1353 mtx_unlock(&res->hr_amp_lock); in ggate_recv_thread()
1361 res->hr_stat_delete++; in ggate_recv_thread()
1364 res->hr_stat_flush++; in ggate_recv_thread()
1384 struct hast_resource *res = arg; in local_send_thread() local
1402 ret = pread(res->hr_localfd, ggio->gctl_data, in local_send_thread()
1404 ggio->gctl_offset + res->hr_localoff); in local_send_thread()
1425 ret = pwrite(res->hr_localfd, ggio->gctl_data, in local_send_thread()
1427 ggio->gctl_offset + res->hr_localoff); in local_send_thread()
1442 write_complete(res, hio); in local_send_thread()
1447 ret = g_delete(res->hr_localfd, in local_send_thread()
1448 ggio->gctl_offset + res->hr_localoff, in local_send_thread()
1460 if (!res->hr_localflush) { in local_send_thread()
1465 ret = g_flush(res->hr_localfd); in local_send_thread()
1468 res->hr_localflush = false; in local_send_thread()
1480 write_complete(res, hio); in local_send_thread()
1502 keepalive_send(struct hast_resource *res, unsigned int ncomp) in keepalive_send() argument
1508 if (!ISCONNECTED(res, ncomp)) { in keepalive_send()
1513 PJDLOG_ASSERT(res->hr_remotein != NULL); in keepalive_send()
1514 PJDLOG_ASSERT(res->hr_remoteout != NULL); in keepalive_send()
1525 if (hast_proto_send(res, res->hr_remoteout, nv, NULL, 0) == -1) { in keepalive_send()
1530 remote_close(res, ncomp); in keepalive_send()
1545 struct hast_resource *res = arg; in remote_send_thread() local
1566 keepalive_send(res, ncomp); in remote_send_thread()
1623 if (!ISCONNECTED(res, ncomp)) { in remote_send_thread()
1641 if (hast_proto_send(res, res->hr_remoteout, nv, data, in remote_send_thread()
1650 remote_close(res, ncomp); in remote_send_thread()
1670 mtx_lock(&res->hr_amp_lock); in remote_send_thread()
1671 if (activemap_need_sync(res->hr_amp, ggio->gctl_offset, in remote_send_thread()
1673 (void)hast_activemap_flush(res); in remote_send_thread()
1675 mtx_unlock(&res->hr_amp_lock); in remote_send_thread()
1680 write_complete(res, hio); in remote_send_thread()
1702 struct hast_resource *res = arg; in remote_recv_thread() local
1727 if (!ISCONNECTED(res, ncomp)) { in remote_recv_thread()
1743 if (hast_proto_recv_hdr(res->hr_remotein, &nv) == -1) { in remote_recv_thread()
1747 remote_close(res, ncomp); in remote_recv_thread()
1787 if (!ISCONNECTED(res, ncomp)) { in remote_recv_thread()
1792 if (hast_proto_recv_data(res, res->hr_remotein, nv, in remote_recv_thread()
1799 remote_close(res, ncomp); in remote_recv_thread()
1821 write_complete(res, hio); in remote_recv_thread()
1864 struct hast_resource *res = arg; in ggate_send_thread() local
1893 res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) in ggate_send_thread()
1899 mtx_lock(&res->hr_amp_lock); in ggate_send_thread()
1900 if (activemap_write_complete(res->hr_amp, in ggate_send_thread()
1902 res->hr_stat_activemap_update++; in ggate_send_thread()
1903 (void)hast_activemap_flush(res); in ggate_send_thread()
1905 mtx_unlock(&res->hr_amp_lock); in ggate_send_thread()
1919 write_complete(res, hio); in ggate_send_thread()
1921 if (ioctl(res->hr_ggatefd, G_GATE_CMD_DONE, ggio) == -1) { in ggate_send_thread()
1929 res->hr_stat_read_error++; in ggate_send_thread()
1932 res->hr_stat_write_error++; in ggate_send_thread()
1935 res->hr_stat_delete_error++; in ggate_send_thread()
1938 res->hr_stat_flush_error++; in ggate_send_thread()
1956 struct hast_resource *res = arg; in sync_thread() local
1979 event_send(res, EVENT_SYNCINTR); in sync_thread()
1991 mtx_lock(&res->hr_amp_lock); in sync_thread()
1993 activemap_sync_rewind(res->hr_amp); in sync_thread()
1994 offset = activemap_sync_offset(res->hr_amp, &length, &syncext); in sync_thread()
2000 if (activemap_extent_complete(res->hr_amp, syncext)) in sync_thread()
2001 (void)hast_activemap_flush(res); in sync_thread()
2003 mtx_unlock(&res->hr_amp_lock); in sync_thread()
2005 mtx_unlock(&res->hr_amp_lock); in sync_thread()
2013 (intmax_t)(res->hr_extentsize * in sync_thread()
2014 activemap_ndirty(res->hr_amp))); in sync_thread()
2015 event_send(res, EVENT_SYNCSTART); in sync_thread()
2028 if (ISCONNECTED(res, ncomp)) { in sync_thread()
2041 event_send(res, EVENT_SYNCDONE); in sync_thread()
2044 if (res->hr_syncsrc == HAST_SYNCSRC_SECONDARY) in sync_thread()
2046 res->hr_syncsrc = HAST_SYNCSRC_UNDEF; in sync_thread()
2047 res->hr_primary_localcnt = in sync_thread()
2048 res->hr_secondary_remotecnt; in sync_thread()
2049 res->hr_primary_remotecnt = in sync_thread()
2050 res->hr_secondary_localcnt; in sync_thread()
2053 (uintmax_t)res->hr_primary_localcnt, in sync_thread()
2054 (uintmax_t)res->hr_primary_remotecnt); in sync_thread()
2055 (void)metadata_write(res); in sync_thread()
2061 enable_direct_reads(res); in sync_thread()
2105 hio->hio_replication = res->hr_replication; in sync_thread()
2113 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in sync_thread()
2121 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); in sync_thread()
2160 if (res->hr_syncsrc == HAST_SYNCSRC_PRIMARY) { in sync_thread()
2168 PJDLOG_ASSERT(res->hr_syncsrc == HAST_SYNCSRC_SECONDARY); in sync_thread()
2213 primary_config_reload(struct hast_resource *res, struct nv *nv) in primary_config_reload() argument
2221 PJDLOG_ASSERT(res->hr_role == HAST_ROLE_PRIMARY); in primary_config_reload()
2222 PJDLOG_ASSERT(gres == res); in primary_config_reload()
2341 guard_one(struct hast_resource *res, unsigned int ncomp) in guard_one() argument
2350 if (!real_remote(res)) { in guard_one()
2355 if (ISCONNECTED(res, ncomp)) { in guard_one()
2356 PJDLOG_ASSERT(res->hr_remotein != NULL); in guard_one()
2357 PJDLOG_ASSERT(res->hr_remoteout != NULL); in guard_one()
2360 res->hr_remoteaddr); in guard_one()
2364 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2365 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2372 res->hr_remoteaddr); in guard_one()
2374 if (init_remote(res, &in, &out) == 0) { in guard_one()
2376 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2377 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2379 res->hr_remotein = in; in guard_one()
2380 res->hr_remoteout = out; in guard_one()
2383 res->hr_remoteaddr); in guard_one()
2387 PJDLOG_ASSERT(res->hr_remotein == NULL); in guard_one()
2388 PJDLOG_ASSERT(res->hr_remoteout == NULL); in guard_one()
2391 res->hr_remoteaddr); in guard_one()
2402 struct hast_resource *res = arg; in guard_thread() local
2442 guard_one(res, ii); in guard_thread()